From 52f1b14caf923fb444c794442e13b391ab2773f7 Mon Sep 17 00:00:00 2001 From: Jake Meiergerd Date: Wed, 31 Jan 2024 16:23:18 -0600 Subject: [PATCH] Reworked testing for all versions of the `ExpireAfter` operator, to improve functional coverage and cover various existing defects. (#821) --- .../Cache/ExpireAfter_Cache_ForSource.cs | 62 ++ .../Cache/ExpireAfter_Cache_ForStream.cs | 87 ++ .../List/ExpireAfter_List.cs | 53 + .../Cache/ExpireAfterFixture.ForSource.cs | 721 ++++++++++++++ .../Cache/ExpireAfterFixture.ForStream.cs | 914 ++++++++++++++++++ .../Cache/ExpireAfterFixture.cs | 115 +-- .../Cache/TimeExpiryFixture.cs | 115 --- .../List/ExpireAfterFixture.cs | 803 +++++++++++++-- .../Utilities/ObservableExtensions.cs | 10 + 9 files changed, 2590 insertions(+), 290 deletions(-) create mode 100644 src/DynamicData.Benchmarks/Cache/ExpireAfter_Cache_ForSource.cs create mode 100644 src/DynamicData.Benchmarks/Cache/ExpireAfter_Cache_ForStream.cs create mode 100644 src/DynamicData.Benchmarks/List/ExpireAfter_List.cs create mode 100644 src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs create mode 100644 src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs delete mode 100644 src/DynamicData.Tests/Cache/TimeExpiryFixture.cs diff --git a/src/DynamicData.Benchmarks/Cache/ExpireAfter_Cache_ForSource.cs b/src/DynamicData.Benchmarks/Cache/ExpireAfter_Cache_ForSource.cs new file mode 100644 index 000000000..84c2a17bb --- /dev/null +++ b/src/DynamicData.Benchmarks/Cache/ExpireAfter_Cache_ForSource.cs @@ -0,0 +1,62 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +using BenchmarkDotNet.Attributes; + +namespace DynamicData.Benchmarks.Cache; + +[MemoryDiagnoser] +[MarkdownExporterAttribute.GitHub] +public class ExpireAfter_Cache_ForSource +{ + public ExpireAfter_Cache_ForSource() + => _items = Enumerable + .Range(1, 1_000) + .Select(id => new Item() + { + Id = id + }) + .ToArray(); + + [Benchmark] + [Arguments(1, 0)] + [Arguments(1, 1)] + [Arguments(10, 0)] + [Arguments(10, 1)] + [Arguments(10, 10)] + [Arguments(100, 0)] + [Arguments(100, 1)] + [Arguments(100, 10)] + [Arguments(100, 100)] + [Arguments(1_000, 0)] + [Arguments(1_000, 1)] + [Arguments(1_000, 10)] + [Arguments(1_000, 100)] + [Arguments(1_000, 1_000)] + public void AddsRemovesAndFinalization(int addCount, int removeCount) + { + using var source = new SourceCache(static item => item.Id); + + using var subscription = source + .ExpireAfter( + timeSelector: static _ => TimeSpan.FromMinutes(60), + interval: null) + .Subscribe(); + + for (var i = 0; i < addCount; ++i) + source.AddOrUpdate(_items[i]); + + for (var i = 0; i < removeCount; ++i) + source.RemoveKey(_items[i].Id); + + subscription.Dispose(); + } + + private readonly IReadOnlyList _items; + + private sealed class Item + { + public int Id { get; init; } + } +} diff --git a/src/DynamicData.Benchmarks/Cache/ExpireAfter_Cache_ForStream.cs b/src/DynamicData.Benchmarks/Cache/ExpireAfter_Cache_ForStream.cs new file mode 100644 index 000000000..11784f891 --- /dev/null +++ b/src/DynamicData.Benchmarks/Cache/ExpireAfter_Cache_ForStream.cs @@ -0,0 +1,87 @@ +using System; +using System.Collections.Generic; +using System.Reactive.Subjects; + +using BenchmarkDotNet.Attributes; + +namespace DynamicData.Benchmarks.Cache; + +[MemoryDiagnoser] +[MarkdownExporterAttribute.GitHub] +public class ExpireAfter_Cache_ForStream +{ + public ExpireAfter_Cache_ForStream() + { + var additions = new List>(capacity: 1_000); + var removals = new List>(capacity: 1_000); + + for (var id = 1; id <= 1_000; ++id) + { + var item = new Item() + { + Id = id + }; + + additions.Add(new ChangeSet(capacity: 1) + { + new(reason: ChangeReason.Add, + key: id, + current: item) + }); + + removals.Add(new ChangeSet() + { + new(reason: ChangeReason.Remove, + key: item.Id, + current: item) + }); + } + + _additions = additions; + _removals = removals; + } + + [Benchmark] + [Arguments(1, 0)] + [Arguments(1, 1)] + [Arguments(10, 0)] + [Arguments(10, 1)] + [Arguments(10, 10)] + [Arguments(100, 0)] + [Arguments(100, 1)] + [Arguments(100, 10)] + [Arguments(100, 100)] + [Arguments(1_000, 0)] + [Arguments(1_000, 1)] + [Arguments(1_000, 10)] + [Arguments(1_000, 100)] + [Arguments(1_000, 1_000)] + public void AddsRemovesAndFinalization(int addCount, int removeCount) + { + using var source = new Subject>(); + + using var subscription = source + .ExpireAfter(static _ => TimeSpan.FromMinutes(60)) + .Subscribe(); + + var itemLifetime = TimeSpan.FromMilliseconds(1); + + var itemsToRemove = new List(); + + for (var i = 0; i < addCount; ++i) + source.OnNext(_additions[i]); + + for (var i = 0; i < removeCount; ++i) + source.OnNext(_removals[i]); + + subscription.Dispose(); + } + + private readonly IReadOnlyList> _additions; + private readonly IReadOnlyList> _removals; + + private sealed class Item + { + public int Id { get; init; } + } +} diff --git a/src/DynamicData.Benchmarks/List/ExpireAfter_List.cs b/src/DynamicData.Benchmarks/List/ExpireAfter_List.cs new file mode 100644 index 000000000..93911129d --- /dev/null +++ b/src/DynamicData.Benchmarks/List/ExpireAfter_List.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +using BenchmarkDotNet.Attributes; + +namespace DynamicData.Benchmarks.List; + +[MemoryDiagnoser] +[MarkdownExporterAttribute.GitHub] +public class ExpireAfter_List +{ + public ExpireAfter_List() + => _items = Enumerable + .Range(0, 1_000) + .Select(_ => new object()) + .ToArray(); + + [Benchmark] + [Arguments(1, 0)] + [Arguments(1, 1)] + [Arguments(10, 0)] + [Arguments(10, 1)] + [Arguments(10, 10)] + [Arguments(100, 0)] + [Arguments(100, 1)] + [Arguments(100, 10)] + [Arguments(100, 100)] + [Arguments(1_000, 0)] + [Arguments(1_000, 1)] + [Arguments(1_000, 10)] + [Arguments(1_000, 100)] + [Arguments(1_000, 1_000)] + public void AddsRemovesAndFinalization(int addCount, int removeCount) + { + using var source = new SourceList(); + + using var subscription = source + .ExpireAfter(static _ => TimeSpan.FromMinutes(60), pollingInterval: null) + .Subscribe(); + + for (var i = 0; i < addCount; ++i) + source.Add(_items[i]); + + var targetCount = addCount - removeCount; + while (source.Count > targetCount) + source.RemoveAt(0); + + subscription.Dispose(); + } + + private readonly IReadOnlyList _items; +} diff --git a/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs new file mode 100644 index 000000000..77f38be68 --- /dev/null +++ b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs @@ -0,0 +1,721 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Concurrency; +using System.Reactive.Linq; +using System.Threading.Tasks; + +using Bogus; +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; + +namespace DynamicData.Tests.Cache; + +public static partial class ExpireAfterFixture +{ + public sealed class ForSource + { + [Fact] + public void ItemIsRemovedBeforeExpiration_ExpirationIsCancelled() + { + using var source = CreateTestSource(); + + var scheduler = CreateTestScheduler(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item2 = new Item() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.AddOrUpdate(new[] { item1, item2, item3 }); + scheduler.AdvanceBy(1); + + var item4 = new Item() { Id = 4 }; + source.AddOrUpdate(item4); + scheduler.AdvanceBy(1); + + source.RemoveKey(2); + scheduler.AdvanceBy(1); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no items should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item1, item3, item4 }, "3 items were added, and one was removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1, item3 }.Select(item => new KeyValuePair(item.Id, item)), "items #1 and #3 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item4 }, "items #1 and #3 should have been removed"); + + results.TryGetRecordedCompletion().Should().BeFalse(); + } + + [Fact] + public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() + { + using var source = CreateTestSource(); + + var scheduler = CreateTestScheduler(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.AddOrUpdate(item1); + scheduler.AdvanceBy(1); + + // Extend the expiration to a later time + var item2 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; + source.AddOrUpdate(item2); + scheduler.AdvanceBy(1); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item2 }, "item #1 was added, and then replaced"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item2 }, "no changes should have occurred"); + + // Shorten the expiration to an earlier time + var item3 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15) }; + source.AddOrUpdate(item3); + scheduler.AdvanceBy(1); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item3 }, "item #1 was replaced"); + + // One more update with no changes to the expiration + var item4 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15) }; + source.AddOrUpdate(item4); + scheduler.AdvanceBy(1); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item4 }, "item #1 was replaced"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item4 }.Select(item => new KeyValuePair(item.Id, item4)), "item #1 should have expired"); + source.Items.Should().BeEmpty("item #1 should have expired"); + + scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(1).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEmpty("no changes should have occurred"); + + results.TryGetRecordedCompletion().Should().BeFalse(); + } + + [Fact] + public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() + { + using var source = CreateTestSource(); + + var scheduler = CreateTestScheduler(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + pollingInterval: TimeSpan.FromMilliseconds(20), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item2 = new Item() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(30) }; + var item4 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(40) }; + var item5 = new Item() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(100) }; + source.AddOrUpdate(new[] { item1, item2, item3, item4, item5 }); + scheduler.AdvanceBy(1); + + // Additional expirations at 20ms. + var item6 = new Item() { Id = 6, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + var item7 = new Item() { Id = 7, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + source.AddOrUpdate(new[] { item6, item7 }); + scheduler.AdvanceBy(1); + + // Out-of-order expiration + var item8 = new Item() { Id = 8, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15)}; + source.AddOrUpdate(item8); + scheduler.AdvanceBy(1); + + // Non-expiring item + var item9 = new Item() { Id = 9 }; + source.AddOrUpdate(item9); + scheduler.AdvanceBy(1); + + // Replacement changing lifetime. + var item10 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(45) }; + source.AddOrUpdate(item10); + scheduler.AdvanceBy(1); + + // Replacement not-affecting lifetime. + var item11 = new Item() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(100) }; + source.AddOrUpdate(item11); + scheduler.AdvanceBy(1); + + // Refresh should not affect scheduled expiration. + item3.Expiration = DateTimeOffset.FromUnixTimeMilliseconds(55); + source.Refresh(item3); + scheduler.AdvanceBy(1); + + + // Verify initial state, after all emissions + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "9 items were added, 2 were replaced, and 1 was refreshed"); + + // Item scheduled to expire at 10ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "no changes should have occurred"); + + // Item scheduled to expire at 15ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "no changes should have occurred"); + + // Expired items should be polled + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1, item2, item6, item7, item8 }.Select(item => new KeyValuePair(item.Id, item)), "items #1, #2, #6, #7, and #8 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "items #1, #2, #6, #7, and #8 should have been removed"); + + // Item scheduled to expire at 30ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(1).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "no changes should have occurred"); + + // Expired items should be polled, but should exclude the one that was changed from 40ms to 45ms. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }.Select(item => new KeyValuePair(item.Id, item)), "item #3 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "item #3 should have been removed"); + + // Item scheduled to expire at 45ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(2).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "no changes should have occurred"); + + // Expired items should be polled + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(60).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }.Select(item => new KeyValuePair(item.Id, item)), "item #10 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item9, item11 }, "item #10 should have been removed"); + + // Expired items should be polled, but none should be found + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(80).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(3).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item9, item11 }, "no changes should have occurred"); + + // Expired items should be polled + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(100).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }.Select(item => new KeyValuePair(item.Id, item)), "item #11 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have been removed"); + + // Next poll should not find anything to expire. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(120).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(4).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item9 }, "no changes should have occurred"); + + results.TryGetRecordedCompletion().Should().BeFalse(); + } + + [Fact(Skip = "Existing defect, very minor defect, items defined to never expire actually do, at DateTimeOffset.MaxValue")] + public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() + { + using var source = CreateTestSource(); + + var scheduler = CreateTestScheduler(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item2 = new Item() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(30) }; + var item4 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(40) }; + var item5 = new Item() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(50) }; + source.AddOrUpdate(new[] { item1, item2, item3, item4, item5 }); + scheduler.AdvanceBy(1); + + // Additional expirations at 20ms. + var item6 = new Item() { Id = 6, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + var item7 = new Item() { Id = 7, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + source.AddOrUpdate(new[] { item6, item7 }); + scheduler.AdvanceBy(1); + + // Out-of-order expiration + var item8 = new Item() { Id = 8, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15)}; + source.AddOrUpdate(item8); + scheduler.AdvanceBy(1); + + // Non-expiring item + var item9 = new Item() { Id = 9 }; + source.AddOrUpdate(item9); + scheduler.AdvanceBy(1); + + // Replacement changing lifetime. + var item10 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(45) }; + source.AddOrUpdate(item10); + scheduler.AdvanceBy(1); + + // Replacement not-affecting lifetime. + var item11 = new Item() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(50) }; + source.AddOrUpdate(item11); + scheduler.AdvanceBy(1); + + // Refresh should not affect scheduled expiration. + item3.Expiration = DateTimeOffset.FromUnixTimeMilliseconds(55); + source.Refresh(item3); + scheduler.AdvanceBy(1); + + + // Verify initial state, after all emissions + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "11 items were added, 2 were replaced, and 1 was refreshed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1 }.Select(item => new KeyValuePair(item.Id, item)), "item #1 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item2, item3, item6, item7, item8, item9, item10, item11 }, "item #1 should have been removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item8 }.Select(item => new KeyValuePair(item.Id, item)), "item #8 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item2, item3, item6, item7, item9, item10, item11 }, "item #8 should have expired"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item2, item6, item7 }.Select(item => new KeyValuePair(item.Id, item)), "items #2, #6, and #7 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "items #2, #6, and #7 should have been removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }.Select(item => new KeyValuePair(item.Id, item)), "item #3 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "item #3 should have been removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(4).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "no changes should have occurred"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(4).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(4).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }.Select(item => new KeyValuePair(item.Id, item)), "item #10 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item9, item11 }, "item #10 should have expired"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(50).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(5).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(5).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }.Select(item => new KeyValuePair(item.Id, item)), "item #11 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have expired"); + + // Remaining item should never expire + scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(6).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item9 }, "no changes should have occurred"); + + results.TryGetRecordedCompletion().Should().BeFalse(); + } + + // Covers https://github.com/reactivemarbles/DynamicData/issues/716 + [Fact(Skip = "Existing defect, removals are skipped when scheduler invokes early")] + public void SchedulerIsInaccurate_RemovalsAreNotSkipped() + { + using var source = CreateTestSource(); + + var scheduler = new FakeScheduler() + { + Now = DateTimeOffset.FromUnixTimeMilliseconds(0) + }; + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.AddOrUpdate(item1); + + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); + + // Simulate the scheduler invoking all actions 1ms early. + while(scheduler.ScheduledActions.Count is not 0) + { + if (scheduler.ScheduledActions[0].DueTime is DateTimeOffset dueTime) + scheduler.Now = dueTime - TimeSpan.FromMilliseconds(1); + + scheduler.ScheduledActions[0].Invoke(); + scheduler.ScheduledActions.RemoveAt(0); + } + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1 }.Select(item => new KeyValuePair(item.Id, item)), "item #1 should have expired"); + source.Items.Should().BeEmpty("item #1 should have been removed"); + + results.TryGetRecordedCompletion().Should().BeFalse(); + } + + [Fact(Skip = "Existing defect, completion is not propagated from the source")] + public void SourceCompletes_CompletionIsPropagated() + { + using var source = CreateTestSource(); + + var scheduler = CreateTestScheduler(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + source.AddOrUpdate(new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }); + scheduler.AdvanceBy(1); + + source.Complete(); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.TryGetRecordedCompletion().Should().BeTrue(); + + // Ensure that the operator does not attept to continue removing items. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.EnumerateInvalidNotifications().Should().BeEmpty(); + } + + [Fact(Skip = "Existing defect, completion is not propagated from the source")] + public void SourceCompletesImmediately_CompletionIsPropagated() + { + using var source = CreateTestSource(); + + var scheduler = CreateTestScheduler(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.AddOrUpdate(item1); + scheduler.AdvanceBy(1); + + source.Complete(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.TryGetRecordedCompletion().Should().BeTrue(); + source.Items.Should().BeEquivalentTo(new[] { item1 }, "no changes should have occurred"); + + results.EnumerateInvalidNotifications().Should().BeEmpty(); + } + + [Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, operator does not use safe subscriptions")] + public void SourceErrors_ErrorIsPropagated() + { + using var source = CreateTestSource(); + + var scheduler = CreateTestScheduler(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + source.AddOrUpdate(new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }); + scheduler.AdvanceBy(1); + + var error = new Exception("This is a test"); + source.SetError(error); + + results.TryGetRecordedError().Should().Be(error, "an error was published"); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.TryGetRecordedCompletion().Should().BeFalse(); + + // Ensure that the operator does not attept to continue removing items. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.EnumerateInvalidNotifications().Should().BeEmpty(); + } + + [Fact(Skip = "Existing defect, immediately-occuring error is not propagated")] + public void SourceErrorsImmediately_ErrorIsPropagated() + { + using var source = CreateTestSource(); + + var scheduler = CreateTestScheduler(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.AddOrUpdate(item1); + scheduler.AdvanceBy(1); + + var error = new Exception("This is a test"); + source.SetError(error); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + results.TryGetRecordedError().Should().Be(error, "an error was published"); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.TryGetRecordedCompletion().Should().BeFalse(); + source.Items.Should().BeEquivalentTo(new[] { item1 }, "no changes should have occurred"); + + // Ensure that the operator does not attept to continue removing items. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.EnumerateInvalidNotifications().Should().BeEmpty(); + } + + [Fact] + public void SourceIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.ExpireAfter( + source: (null as ISourceCache)!, + timeSelector: static _ => default, + interval: null)) + .Should().Throw(); + + [Fact(Skip = "Existing defect, operator does not properly handle items with a null timeout, when using a real scheduler, it passes a TimeSpan to the scheduler that is outside of the supported range")] + public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe() + { + using var source = CreateTestSource(); + + var scheduler = ThreadPoolScheduler.Instance; + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var maxExpiration = PerformStressEdits( + source: source, + scheduler: scheduler, + stressCount: 10_000, + minItemLifetime: TimeSpan.FromMilliseconds(10), + maxItemLifetime: TimeSpan.FromMilliseconds(50), + maxChangeCount: 10); + + await Observable.Timer(maxExpiration + TimeSpan.FromMilliseconds(100), scheduler); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().SelectMany(static removals => removals).Should().AllSatisfy(static pair => pair.Value.Expiration.Should().NotBeNull("only items with an expiration should have expired")); + results.TryGetRecordedCompletion().Should().BeFalse(); + source.Items.Should().AllSatisfy(item => item.Expiration.Should().BeNull("all items with an expiration should have expired")); + } + + [Fact] + public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe() + { + using var source = CreateTestSource(); + + var scheduler = ThreadPoolScheduler.Instance; + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + pollingInterval: TimeSpan.FromMilliseconds(10), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var maxExpiration = PerformStressEdits( + source: source, + scheduler: scheduler, + stressCount: 10_000, + minItemLifetime: TimeSpan.FromMilliseconds(10), + maxItemLifetime: TimeSpan.FromMilliseconds(50), + maxChangeCount: 10); + + await Observable.Timer(maxExpiration + TimeSpan.FromMilliseconds(100), scheduler); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().SelectMany(static removals => removals).Should().AllSatisfy(pair => pair.Value.Expiration.Should().NotBeNull("only items with an expiration should have expired")); + results.TryGetRecordedCompletion().Should().BeFalse(); + source.Items.Should().AllSatisfy(item => item.Expiration.Should().BeNull("all items with an expiration should have expired")); + } + + [Fact] + public void TimeSelectorIsNull_ThrowsException() + => FluentActions.Invoking(() => CreateTestSource().ExpireAfter( + timeSelector: null!, + interval: null)) + .Should().Throw(); + + [Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, user code is not protected")] + public void TimeSelectorThrows_ErrorIsPropagated() + { + using var source = CreateTestSource(); + + var scheduler = CreateTestScheduler(); + + var error = new Exception("This is a test."); + + using var subscription = source + .ExpireAfter( + timeSelector: _ => throw error, + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + source.AddOrUpdate(new Item() { Id = 1 }); + scheduler.AdvanceBy(1); + + results.TryGetRecordedError().Should().Be(error); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.TryGetRecordedCompletion().Should().BeFalse(); + + results.EnumerateInvalidNotifications().Should().BeEmpty(); + } + + private static TestSourceCache CreateTestSource() + => new(static item => item.Id); + + private static DateTimeOffset PerformStressEdits( + ISourceCache source, + IScheduler scheduler, + int stressCount, + TimeSpan minItemLifetime, + TimeSpan maxItemLifetime, + int maxChangeCount) + { + var nextItemId = 1; + var randomizer = new Randomizer(1234567); + var maxExpiration = DateTimeOffset.MinValue; + + for (var i = 0; i < stressCount; ++i) + source.Edit(mutator => + { + var changeCount = randomizer.Int(1, maxChangeCount); + + for (var i = 0; i < changeCount; ++i) + { + var changeReason = (mutator.Count is 0) + ? ChangeReason.Add + : randomizer.Enum(exclude: ChangeReason.Moved); + + if (changeReason is ChangeReason.Add) + { + mutator.AddOrUpdate(new Item() + { + Id = nextItemId++, + Expiration = GenerateExpiration() + }); + continue; + } + + var key = randomizer.CollectionItem((ICollection)mutator.Keys); + + switch (changeReason) + { + case ChangeReason.Refresh: + mutator.Refresh(key); + break; + + case ChangeReason.Remove: + mutator.RemoveKey(key); + break; + + case ChangeReason.Update: + source.AddOrUpdate(new Item() + { + Id = key, + Expiration = GenerateExpiration() + }); + break; + } + } + }); + + return maxExpiration; + + DateTimeOffset? GenerateExpiration() + { + if (randomizer.Bool()) + return null; + + var expiration = scheduler.Now + randomizer.TimeSpan(minItemLifetime, maxItemLifetime); + if (expiration > maxExpiration) + maxExpiration = expiration; + + return expiration; + } + } + } +} diff --git a/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs new file mode 100644 index 000000000..316a4ad5d --- /dev/null +++ b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs @@ -0,0 +1,914 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Concurrency; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading.Tasks; + +using Bogus; +using FluentAssertions; +using Xunit; + +using DynamicData.Tests.Utilities; +using System.Reactive.Disposables; + +namespace DynamicData.Tests.Cache; + +public static partial class ExpireAfterFixture +{ + public sealed class ForStream + { + [Fact] + public void ExpiredItemIsRemoved_RemovalIsSkipped() + { + using var source = new Subject>(); + + var scheduler = CreateTestScheduler(); + + using var results = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item2 = new Item() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(30) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1), + new(reason: ChangeReason.Add, key: item2.Id, current: item2), + new(reason: ChangeReason.Add, key: item3.Id, current: item3), + }); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(1, "1 source operation was performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3 }, "item #1 should have been removed"); + + // Send a notification to remove an item that's already been removed + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Remove, key: item1.Id, current: item1), + }); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Skip(2).Should().BeEmpty("no changes should have occurred"); + + results.IsCompleted.Should().BeFalse(); + } + + [Fact] + public void ItemIsRemovedBeforeExpiration_ExpirationIsCancelled() + { + using var source = new Subject>(); + + var scheduler = CreateTestScheduler(); + + using var results = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item2 = new Item() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1), + new(reason: ChangeReason.Add, key: item2.Id, current: item2), + new(reason: ChangeReason.Add, key: item3.Id, current: item3), + }); + scheduler.AdvanceBy(1); + + var item4 = new Item() { Id = 4 }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item4.Id, current: item4) + }); + scheduler.AdvanceBy(1); + + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Remove, key: item2.Id, current: item2) + }); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(3, "3 source operations were performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1, item3, item4 }, "3 items were added, and one was removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item4 }, "items #1 and #3 should have been removed"); + + results.IsCompleted.Should().BeFalse(); + } + + [Fact] + public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() + { + using var source = new Subject>(); + + var scheduler = CreateTestScheduler(); + + using var results = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1) + }); + scheduler.AdvanceBy(1); + + // Extend the expiration to a later time + var item2 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Update, key: item2.Id, current: item2, previous: item1) + }); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(2, "2 source operations were performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "item #1 was added, and then replaced"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(2).Should().BeEmpty("no expirations should have occurred"); + + // Shorten the expiration to an earlier time + var item3 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Update, key: item3.Id, current: item3, previous: item2) + }); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Skip(2).Count().Should().Be(1, "1 source operation was performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item3 }, "item #1 was replaced"); + + // One more update with no changes to the expiration + var item4 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Update, key: item4.Id, current: item4, previous: item3) + }); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Skip(3).Count().Should().Be(1, "1 source operation was performed."); + results.Data.Items.Should().BeEquivalentTo(new[] { item4 }, "item #1 was replaced"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(4).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEmpty("item #1 should have expired"); + + scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(5).Should().BeEmpty("no expirations should have occurred"); + + results.IsCompleted.Should().BeFalse(); + } + + [Fact] + public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() + { + using var source = new Subject>(); + + var scheduler = CreateTestScheduler(); + + using var results = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + pollingInterval: TimeSpan.FromMilliseconds(20), + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item2 = new Item() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(30) }; + var item4 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(40) }; + var item5 = new Item() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(100) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1), + new(reason: ChangeReason.Add, key: item2.Id, current: item2), + new(reason: ChangeReason.Add, key: item3.Id, current: item3), + new(reason: ChangeReason.Add, key: item4.Id, current: item4), + new(reason: ChangeReason.Add, key: item5.Id, current: item5) + }); + scheduler.AdvanceBy(1); + + // Additional expirations at 20ms. + var item6 = new Item() { Id = 6, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + var item7 = new Item() { Id = 7, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item6.Id, current: item6), + new(reason: ChangeReason.Add, key: item7.Id, current: item7) + }); + scheduler.AdvanceBy(1); + + // Out-of-order expiration + var item8 = new Item() { Id = 8, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15)}; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item8.Id, current: item8) + }); + scheduler.AdvanceBy(1); + + // Non-expiring item + var item9 = new Item() { Id = 9 }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item9.Id, current: item9) + }); + scheduler.AdvanceBy(1); + + // Replacement changing lifetime. + var item10 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(45) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Update, key: item10.Id, current: item10, previous: item4) + }); + scheduler.AdvanceBy(1); + + // Replacement not-affecting lifetime. + var item11 = new Item() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(100) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Update, key: item11.Id, current: item11, previous: item5) + }); + scheduler.AdvanceBy(1); + + // Refresh should not affect scheduled expiration. + item3.Expiration = DateTimeOffset.FromUnixTimeMilliseconds(55); + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Refresh, key: item3.Id, current: item3) + }); + scheduler.AdvanceBy(1); + + + // Verify initial state, after all emissions + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(7, "7 source operations were performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "9 items were added, 2 were replaced, and 1 was refreshed"); + + // Item scheduled to expire at 10ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(7).Should().BeEmpty("no changes should have occurred"); + + // Item scheduled to expire at 15ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(7).Should().BeEmpty("no changes should have occurred"); + + // Expired items should be polled + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(7).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "items #1, #2, #6, #7, and #8 should have been removed"); + + // Item scheduled to expire at 30ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(8).Should().BeEmpty("no changes should have occurred"); + + // Expired items should be polled, but should exclude the one that was changed from 40ms to 45ms. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(8).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "item #3 should have been removed"); + + // Item scheduled to expire at 45ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(9).Should().BeEmpty("no changes should have occurred"); + + // Expired items should be polled + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(60).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(9).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item9, item11 }, "item #10 should have been removed"); + + // Expired items should be polled, but none should be found + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(80).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(10).Should().BeEmpty("no changes should have occurred"); + + // Expired items should be polled + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(100).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(10).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have been removed"); + + // Next poll should not find anything to expire. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(120).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(11).Should().BeEmpty("no changes should have occurred"); + + results.IsCompleted.Should().BeFalse(); + } + + [Fact(Skip = "Existing defect, very minor defect, items defined to never expire actually do, at DateTimeOffset.MaxValue")] + public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() + { + using var source = new Subject>(); + + var scheduler = CreateTestScheduler(); + + using var results = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item2 = new Item() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(30) }; + var item4 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(40) }; + var item5 = new Item() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(50) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1), + new(reason: ChangeReason.Add, key: item2.Id, current: item2), + new(reason: ChangeReason.Add, key: item3.Id, current: item3), + new(reason: ChangeReason.Add, key: item4.Id, current: item4), + new(reason: ChangeReason.Add, key: item5.Id, current: item5) + }); + scheduler.AdvanceBy(1); + + // Additional expirations at 20ms. + var item6 = new Item() { Id = 6, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + var item7 = new Item() { Id = 7, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item6.Id, current: item6), + new(reason: ChangeReason.Add, key: item7.Id, current: item7) + }); + scheduler.AdvanceBy(1); + + // Out-of-order expiration + var item8 = new Item() { Id = 8, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15)}; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item8.Id, current: item8) + }); + scheduler.AdvanceBy(1); + + // Non-expiring item + var item9 = new Item() { Id = 9 }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item9.Id, current: item9) + }); + scheduler.AdvanceBy(1); + + // Replacement changing lifetime. + var item10 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(45) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Update, key: item10.Id, current: item10, previous: item4) + }); + scheduler.AdvanceBy(1); + + // Replacement not-affecting lifetime. + var item11 = new Item() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(50) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Update, key: item11.Id, current: item11, previous: item5) + }); + scheduler.AdvanceBy(1); + + // Refresh should not affect scheduled expiration. + item3.Expiration = DateTimeOffset.FromUnixTimeMilliseconds(55); + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Refresh, key: item3.Id, current: item3) + }); + scheduler.AdvanceBy(1); + + + // Verify initial state, after all emissions + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(7, "7 source operations were performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "11 items were added, 2 were replaced, and 1 was refreshed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(7).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item6, item7, item8, item9, item10, item11 }, "item #1 should have been removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(8).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item6, item7, item9, item10, item11 }, "item #8 should have expired"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(9).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "items #2, #6, and #7 should have been removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(10).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "item #3 should have been removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(11).Should().BeEmpty("no changes should have occurred"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(12).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item9, item11 }, "item #10 should have expired"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(50).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(12).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have expired"); + + // Remaining item should never expire + scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(13).Should().BeEmpty("no changes should have occurred"); + + results.IsCompleted.Should().BeFalse(); + } + + [Fact(Skip = "Existing defect, completion does not wait")] + public void RemovalsArePending_CompletionWaitsForRemovals() + { + using var source = new Subject>(); + + var scheduler = CreateTestScheduler(); + + using var results = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1), + new(reason: ChangeReason.Add, key: item2.Id, current: item2), + new(reason: ChangeReason.Add, key: item3.Id, current: item3) + }); + scheduler.AdvanceBy(1); + + // Verify initial state, after all emissions + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(1, "1 source operation was performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3 }, "item #1 should have been removed"); + + source.OnCompleted(); + + results.Error.Should().BeNull(); + results.IsCompleted.Should().BeFalse("removals are pending"); + results.Messages.Skip(2).Should().BeEmpty("no changes should have occurred"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); + + results.Error.Should().BeNull(); + results.IsCompleted.Should().BeTrue(); + results.Messages.Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "item #3 should have expired"); + + results.IsCompleted.Should().BeFalse(); + } + + // Covers https://github.com/reactivemarbles/DynamicData/issues/716 + [Fact(Skip = "Existing defect, removals are skipped when scheduler invokes early")] + public void SchedulerIsInaccurate_RemovalsAreNotSkipped() + { + using var source = new Subject>(); + + var scheduler = new FakeScheduler() + { + Now = DateTimeOffset.FromUnixTimeMilliseconds(0) + }; + + using var results = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1) + }); + + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(1, "1 source operation was performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); + + // Simulate the scheduler invoking all actions 1ms early. + while(scheduler.ScheduledActions.Count is not 0) + { + if (scheduler.ScheduledActions[0].DueTime is DateTimeOffset dueTime) + scheduler.Now = dueTime - TimeSpan.FromMilliseconds(1); + + scheduler.ScheduledActions[0].Invoke(); + scheduler.ScheduledActions.RemoveAt(0); + } + + results.Error.Should().BeNull(); + results.Messages.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.Data.Items.Should().BeEmpty("item #1 should have been removed"); + + results.IsCompleted.Should().BeFalse(); + } + + [Fact] + public void SourceCompletes_CompletionIsPropagated() + { + using var source = new Subject>(); + + var scheduler = CreateTestScheduler(); + + using var results = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1), + new(reason: ChangeReason.Add, key: item2.Id, current: item2), + new(reason: ChangeReason.Add, key: item3.Id, current: item3) + }); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(1, "1 source operation was performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); + + source.OnCompleted(); + + results.Error.Should().BeNull(); + results.Messages.Skip(1).Should().BeEmpty("no changes should have occurred"); + + results.IsCompleted.Should().BeTrue(); + } + + [Fact] + public void SourceCompletesImmediately_CompletionIsPropagated() + { + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + + var source = Observable.Create>(observer => + { + observer.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1), + new(reason: ChangeReason.Add, key: item2.Id, current: item2), + new(reason: ChangeReason.Add, key: item3.Id, current: item3) + }); + + observer.OnCompleted(); + + return Disposable.Empty; + }); + + var scheduler = CreateTestScheduler(); + + using var results = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(1, "1 source operation was performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); + + results.IsCompleted.Should().BeTrue(); + } + + [Fact] + public void SourceErrors_ErrorIsPropagated() + { + using var source = new Subject>(); + + var scheduler = CreateTestScheduler(); + + using var results = source + .ExpireAfter( + timeSelector: item => item.Expiration - scheduler.Now, + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1) + }); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(1, "1 source operations was performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); + + var error = new Exception("This is a test"); + source.OnError(error); + + results.Error.Should().Be(error); + results.Messages.Skip(1).Should().BeEmpty("no changes should have occurred"); + results.IsCompleted.Should().BeFalse(); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.Messages.Skip(1).Should().BeEmpty("notifications should not get published after an error"); + } + + [Fact] + public void SourceErrorsImmediately_ErrorIsPropagated() + { + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + + var error = new Exception("This is a test"); + + var source = Observable.Create>(observer => + { + observer.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1) + }); + + observer.OnError(error); + + return Disposable.Empty; + }); + + var scheduler = CreateTestScheduler(); + + using var results = source + .ExpireAfter( + timeSelector: item => item.Expiration - scheduler.Now, + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + results.Error.Should().Be(error); + results.Messages.Count.Should().Be(1, "1 source operations was performed"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); + results.IsCompleted.Should().BeFalse(); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.Messages.Skip(1).Should().BeEmpty("notifications should not get published after an error"); + } + + [Fact] + public void SourceIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableCacheEx.ExpireAfter( + source: (null as IObservable>)!, + timeSelector: static _ => default)) + .Should().Throw(); + + [Fact(Skip = "Existing defect, operator does not properly handle items with a null timeout, when using a real scheduler, it passes a TimeSpan to the scheduler that is outside of the supported range")] + public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe() + { + using var source = new Subject>(); + + var scheduler = ThreadPoolScheduler.Instance; + + using var results = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var maxExpiration = PublishStressChangeSets( + source: source, + scheduler: scheduler, + stressCount: 10_000, + minItemLifetime: TimeSpan.FromMilliseconds(10), + maxItemLifetime: TimeSpan.FromMilliseconds(50), + maxChangeCount: 10); + + await Observable.Timer(maxExpiration + TimeSpan.FromMilliseconds(100), scheduler); + + results.Error.Should().BeNull(); + results.Messages.SelectMany(static changeSet => changeSet.Where(change => change.Reason is ChangeReason.Remove)).Should().AllSatisfy(static change => change.Current.Expiration.Should().NotBeNull("only items with an expiration should have expired")); + results.Data.Items.Should().AllSatisfy(item => item.Expiration.Should().BeNull("all items with an expiration should have expired")); + + results.IsCompleted.Should().BeFalse(); + } + + [Fact] + public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe() + { + using var source = new Subject>(); + + var scheduler = ThreadPoolScheduler.Instance; + + using var results = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + pollingInterval: TimeSpan.FromMilliseconds(10), + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var maxExpiration = PublishStressChangeSets( + source: source, + scheduler: scheduler, + stressCount: 10_000, + minItemLifetime: TimeSpan.FromMilliseconds(10), + maxItemLifetime: TimeSpan.FromMilliseconds(50), + maxChangeCount: 10); + + await Observable.Timer(maxExpiration + TimeSpan.FromMilliseconds(100), scheduler); + + var now = scheduler.Now; + + results.Error.Should().BeNull(); + results.Data.Items.Should().AllSatisfy(item => item.Expiration.Should().BeNull("all items with an expiration should have expired")); + + results.IsCompleted.Should().BeFalse(); + } + + [Fact] + public void TimeSelectorIsNull_ThrowsException() + => FluentActions.Invoking(() => new Subject>().ExpireAfter( + timeSelector: null!)) + .Should().Throw(); + + [Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, user code is not protected")] + public void TimeSelectorThrows_SubscriptionReceivesError() + { + using var source = new Subject>(); + + var scheduler = CreateTestScheduler(); + + var error = new Exception("This is a test."); + + using var results = source + .ExpireAfter( + timeSelector: _ => throw error, + scheduler: scheduler) + .ValidateSynchronization() + .AsAggregator(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.OnNext(new ChangeSet() + { + new(reason: ChangeReason.Add, key: item1.Id, current: item1) + }); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Should().BeEmpty("no source operations should have been processed"); + results.IsCompleted.Should().BeFalse(); + } + + private static DateTimeOffset PublishStressChangeSets( + IObserver> source, + IScheduler scheduler, + int stressCount, + TimeSpan minItemLifetime, + TimeSpan maxItemLifetime, + int maxChangeCount) + { + var nextItemId = 1; + var randomizer = new Randomizer(1234567); + var maxExpiration = DateTimeOffset.MinValue; + + var cache = new ChangeAwareCache(); + + for (var i = 0; i < stressCount; ++i) + { + var changeCount = randomizer.Int(1, maxChangeCount); + + for (var j = 0; j < changeCount; ++j) + { + var changeReason = (cache.Count is 0) + ? ChangeReason.Add + : randomizer.Enum(exclude: ChangeReason.Moved); + + if (changeReason is ChangeReason.Add) + { + var item = new Item() + { + Id = nextItemId++, + Expiration = GenerateExpiration() + }; + + cache.AddOrUpdate(item, item.Id); + continue; + } + + var key = randomizer.CollectionItem((ICollection)cache.Keys); + + switch (changeReason) + { + case ChangeReason.Refresh: + cache.Refresh(key); + break; + + case ChangeReason.Remove: + cache.Remove(key); + break; + + case ChangeReason.Update: + var item = new Item() + { + Id = key, + Expiration = GenerateExpiration() + }; + + cache.AddOrUpdate(item, item.Id); + break; + } + } + + source.OnNext(cache.CaptureChanges()); + } + + return maxExpiration; + + DateTimeOffset? GenerateExpiration() + { + if (randomizer.Bool()) + return null; + + var expiration = scheduler.Now + randomizer.TimeSpan(minItemLifetime, maxItemLifetime); + if (expiration > maxExpiration) + maxExpiration = expiration; + + return expiration; + } + } + } +} diff --git a/src/DynamicData.Tests/Cache/ExpireAfterFixture.cs b/src/DynamicData.Tests/Cache/ExpireAfterFixture.cs index 7fb27b4e1..36eb30183 100644 --- a/src/DynamicData.Tests/Cache/ExpireAfterFixture.cs +++ b/src/DynamicData.Tests/Cache/ExpireAfterFixture.cs @@ -1,120 +1,27 @@ using System; -using System.Linq; - -using DynamicData.Tests.Domain; - -using FluentAssertions; +using System.Reactive.Concurrency; using Microsoft.Reactive.Testing; -using Xunit; - namespace DynamicData.Tests.Cache; -public class ExpireAfterFixture : IDisposable +public static partial class ExpireAfterFixture { - private readonly ChangeSetAggregator _results; - - private readonly TestScheduler _scheduler; - - private readonly ISourceCache _source; - - public ExpireAfterFixture() + private static TestScheduler CreateTestScheduler() { - _scheduler = new TestScheduler(); - _source = new SourceCache(p => p.Key); - _results = _source.Connect().AsAggregator(); - } + var scheduler = new TestScheduler(); + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(0).Ticks); - [Fact] - public void CanHandleABatchOfUpdates() - { - var remover = _source.ExpireAfter(p => TimeSpan.FromMilliseconds(100), _scheduler).Subscribe(); - const int size = 100; - var items = Enumerable.Range(1, size).Select(i => new Person($"Name.{i}", i)).ToArray(); - - _source.AddOrUpdate(items); - _scheduler.AdvanceBy(TimeSpan.FromMilliseconds(200).Ticks); - remover.Dispose(); - - _results.Data.Count.Should().Be(0, "Should be no data in the cache"); - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(100, "Should be 100 adds in the first message"); - _results.Messages[1].Removes.Should().Be(100, "Should be 100 removes in the second message"); + return scheduler; } - [Fact] - public void ComplexRemove() - { - TimeSpan? RemoveFunc(Person t) - { - if (t.Age <= 40) - { - return TimeSpan.FromSeconds(5); - } - - if (t.Age <= 80) - { - return TimeSpan.FromSeconds(7); - } - - return null; - } - - const int size = 100; - var items = Enumerable.Range(1, size).Select(i => new Person($"Name.{i}", i)).ToArray(); - _source.AddOrUpdate(items); - - var remover = _source.ExpireAfter(RemoveFunc, _scheduler).Subscribe(); - _scheduler.AdvanceBy(TimeSpan.FromMilliseconds(5010).Ticks); + private static Func CreateTimeSelector(IScheduler scheduler) + => item => item.Expiration - scheduler.Now; - _source.Count.Should().Be(60, "40 items should have been removed from the cache"); - - _scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks); - _source.Count.Should().Be(20, "80 items should have been removed from the cache"); - - remover.Dispose(); - } - - public void Dispose() - { - _results.Dispose(); - _source.Dispose(); - } - - [Fact] - public void ExpireIsCancelledWhenUpdated() + private class Item { - var remover = _source.ExpireAfter(p => TimeSpan.FromMilliseconds(100), _scheduler).Subscribe(); - - _source.Edit( - updater => - { - updater.AddOrUpdate(new Person("Name1", 20)); - updater.AddOrUpdate(new Person("Name1", 21)); - }); - - _scheduler.AdvanceBy(TimeSpan.FromMilliseconds(200).Ticks); - remover.Dispose(); - _results.Data.Count.Should().Be(0, "Should be no data in the cache"); - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 1 add in the first message"); - _results.Messages[0].Updates.Should().Be(1, "Should be 1 update in the first message"); - _results.Messages[1].Removes.Should().Be(1, "Should be 1 remove in the second message"); - } - - [Fact] - public void ItemAddedIsExpired() - { - var remover = _source.ExpireAfter(p => TimeSpan.FromMilliseconds(100), _scheduler).Subscribe(); - - _source.AddOrUpdate(new Person("Name1", 10)); - - _scheduler.AdvanceBy(TimeSpan.FromMilliseconds(200).Ticks); - remover.Dispose(); + public required int Id { get; init; } - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 1 adds in the first update"); - _results.Messages[1].Removes.Should().Be(1, "Should be 1 removes in the second update"); + public DateTimeOffset? Expiration { get; set; } } } diff --git a/src/DynamicData.Tests/Cache/TimeExpiryFixture.cs b/src/DynamicData.Tests/Cache/TimeExpiryFixture.cs deleted file mode 100644 index 6651deb10..000000000 --- a/src/DynamicData.Tests/Cache/TimeExpiryFixture.cs +++ /dev/null @@ -1,115 +0,0 @@ -using System; -using System.Linq; - -using DynamicData.Tests.Domain; - -using FluentAssertions; - -using Microsoft.Reactive.Testing; - -using Xunit; - -namespace DynamicData.Tests.Cache; - -public class TimeExpiryFixture : IDisposable -{ - private readonly ISourceCache _cache; - - private readonly IDisposable _remover; - - private readonly ChangeSetAggregator _results; - - private readonly TestScheduler _scheduler; - - public TimeExpiryFixture() - { - _scheduler = new TestScheduler(); - - _cache = new SourceCache(p => p.Key); - _results = new ChangeSetAggregator(_cache.Connect()); - _remover = _cache.ExpireAfter(p => TimeSpan.FromMilliseconds(100), _scheduler).Subscribe(); - } - - [Fact] - public void AutoRemove() - { - TimeSpan? RemoveFunc(Person t) - { - if (t.Age < 40) - { - return TimeSpan.FromSeconds(4); - } - - if (t.Age < 80) - { - return TimeSpan.FromSeconds(7); - } - - return null; - } - - const int size = 100; - var items = Enumerable.Range(1, size).Select(i => new Person($"Name{i}", i)).ToArray(); - _cache.AddOrUpdate(items); - - var xxx = _cache.ExpireAfter(RemoveFunc, _scheduler).Subscribe(); - _scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks); - - _scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks); - - xxx.Dispose(); - } - - [Fact] - public void CanHandleABatchOfUpdates() - { - const int size = 100; - var items = Enumerable.Range(1, size).Select(i => new Person($"Name.{i}", i)).ToArray(); - - _cache.AddOrUpdate(items); - _scheduler.AdvanceBy(TimeSpan.FromMilliseconds(150).Ticks); - - _results.Data.Count.Should().Be(0, "Should be no data in the cache"); - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(100, "Should be 100 adds in the first message"); - _results.Messages[1].Removes.Should().Be(100, "Should be 100 removes in the second message"); - } - - public void Dispose() - { - _results.Dispose(); - _remover.Dispose(); - _cache.Dispose(); - } - - [Fact] - public void ExpireIsCancelledWhenUpdated() - { - _cache.Edit( - updater => - { - updater.AddOrUpdate(new Person("Name1", 20)); - updater.AddOrUpdate(new Person("Name1", 21)); - }); - - _scheduler.AdvanceBy(TimeSpan.FromSeconds(150).Ticks); - - _results.Data.Count.Should().Be(0, "Should be no data in the cache"); - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 1 add in the first message"); - _results.Messages[0].Updates.Should().Be(1, "Should be 1 update in the first message"); - _results.Messages[1].Removes.Should().Be(1, "Should be 1 remove in the second message"); - } - - [Fact] - public void ItemAddedIsExpired() - { - _cache.AddOrUpdate(new Person("Name1", 10)); - - _scheduler.AdvanceBy(TimeSpan.FromMilliseconds(150).Ticks); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 1 adds in the first update"); - _results.Messages[1].Removes.Should().Be(1, "Should be 1 removes in the second update"); - } -} diff --git a/src/DynamicData.Tests/List/ExpireAfterFixture.cs b/src/DynamicData.Tests/List/ExpireAfterFixture.cs index 437fbecfd..18e224f0b 100644 --- a/src/DynamicData.Tests/List/ExpireAfterFixture.cs +++ b/src/DynamicData.Tests/List/ExpireAfterFixture.cs @@ -1,120 +1,781 @@ using System; using System.Linq; - -using DynamicData.Tests.Domain; - -using FluentAssertions; +using System.Reactive.Concurrency; +using System.Reactive.Linq; +using System.Threading.Tasks; using Microsoft.Reactive.Testing; +using Bogus; +using FluentAssertions; using Xunit; +using DynamicData.Tests.Utilities; +using System.Collections.Generic; + namespace DynamicData.Tests.List; -public class ExpireAfterFixture : IDisposable +public sealed class ExpireAfterFixture { - private readonly ChangeSetAggregator _results; + [Fact] + public void ItemIsRemovedBeforeExpiration_ExpirationIsCancelled() + { + using var source = new TestSourceList(); - private readonly TestScheduler _scheduler; + var scheduler = CreateTestScheduler(); - private readonly ISourceList _source; + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); - public ExpireAfterFixture() - { - _scheduler = new TestScheduler(); - _source = new SourceList(); - _results = _source.Connect().AsAggregator(); + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item2 = new Item() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.AddRange(new[] { item1, item2, item3 }); + scheduler.AdvanceBy(1); + + var item4 = new Item() { Id = 4 }; + source.Add(item4); + scheduler.AdvanceBy(1); + + source.Remove(item2); + scheduler.AdvanceBy(1); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no items should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item1, item3, item4 }, "3 items were added, and one was removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1, item3 }, "items #1 and #3 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item4 }, "items #1 and #3 should have been removed"); + + results.TryGetRecordedCompletion().Should().BeFalse(); } [Fact] - public void CanHandleABatchOfUpdates() + public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() { - var remover = _source.ExpireAfter(p => TimeSpan.FromMilliseconds(100), _scheduler).Subscribe(); - const int size = 100; - var items = Enumerable.Range(1, size).Select(i => new Person($"Name.{i}", i)).ToArray(); - - _source.AddRange(items); - _scheduler.AdvanceBy(TimeSpan.FromMilliseconds(200).Ticks); - remover.Dispose(); - - _results.Data.Count.Should().Be(0, "Should be no data in the cache"); - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(100, "Should be 100 adds in the first message"); - _results.Messages[1].Removes.Should().Be(100, "Should be 100 removes in the second message"); + using var source = new TestSourceList(); + + var scheduler = CreateTestScheduler(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.Add(item1); + scheduler.AdvanceBy(1); + + // Extend the expiration to a later time + var item2 = new Item() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; + source.Replace(item1, item2); + scheduler.AdvanceBy(1); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item2 }, "item #1 was added, and then replaced"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item2 }, "no changes should have occurred"); + + // Shorten the expiration to an earlier time + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15) }; + source.Replace(item2, item3); + scheduler.AdvanceBy(1); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item3 }, "item #2 was replaced"); + + // One more update with no changes to the expiration + var item4 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15) }; + source.Replace(item3, item4); + scheduler.AdvanceBy(1); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item4 }, "item #3 was replaced"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item4 }, "item #4 should have expired"); + source.Items.Should().BeEmpty("item #4 should have expired"); + + scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(1).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEmpty("no changes should have occurred"); + + results.TryGetRecordedCompletion().Should().BeFalse(); } - [Fact] - public void ComplexRemove() + [Fact(Skip = "Existing defect, operator emits empty sets of expired items, instead of skipping emission")] + public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() + { + using var source = new TestSourceList(); + + var scheduler = CreateTestScheduler(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + pollingInterval: TimeSpan.FromMilliseconds(20), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item2 = new Item() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(30) }; + var item4 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(40) }; + var item5 = new Item() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(100) }; + source.AddRange(new[] { item1, item2, item3, item4, item5 }); + scheduler.AdvanceBy(1); + + // Additional expirations at 20ms. + var item6 = new Item() { Id = 6, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + var item7 = new Item() { Id = 7, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + source.AddRange(new[] { item6, item7 }); + scheduler.AdvanceBy(1); + + // Out-of-order expiration + var item8 = new Item() { Id = 8, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15)}; + source.Add(item8); + scheduler.AdvanceBy(1); + + // Non-expiring item + var item9 = new Item() { Id = 9 }; + source.Add(item9); + scheduler.AdvanceBy(1); + + // Replacement changing lifetime. + var item10 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(45) }; + source.Replace(item4, item10); + scheduler.AdvanceBy(1); + + // Replacement not-affecting lifetime. + var item11 = new Item() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(100) }; + source.Replace(item5, item11); + scheduler.AdvanceBy(1); + + // Move should not affect scheduled expiration. + item3.Expiration = DateTimeOffset.FromUnixTimeMilliseconds(55); + source.Move(2, 3); + scheduler.AdvanceBy(1); + + + // Verify initial state, after all emissions + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item1, item2, item10, item3, item11, item6, item7, item8, item9 }, options => options.WithStrictOrdering(), "9 items were added, 2 were replaced, and 1 was refreshed"); + + // Item scheduled to expire at 10ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item1, item2, item10, item3, item11, item6, item7, item8, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); + + // Item scheduled to expire at 15ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item1, item2, item10, item3, item11, item6, item7, item8, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); + + // Expired items should be polled + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1, item2, item6, item7, item8 }, "items #1, #2, #6, #7, and #8 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item10, item3, item11, item9 }, options => options.WithStrictOrdering(), "items #1, #2, #6, #7, and #8 should have been removed"); + + // Item scheduled to expire at 30ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(1).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item10, item3, item11, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); + + // Expired items should be polled, but should exclude the one that was changed from 40ms to 45ms. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }, "item #3 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item10, item11, item9 }, options => options.WithStrictOrdering(), "item #3 should have been removed"); + + // Item scheduled to expire at 45ms, but won't be picked up yet + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(2).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item10, item11, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); + + // Expired items should be polled + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(60).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }, "item #10 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item11, item9 }, "item #10 should have been removed"); + + // Expired items should be polled, but none should be found + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(80).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(3).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item11, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); + + // Expired items should be polled + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(100).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }, "item #11 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item9 }, options => options.WithStrictOrdering(), "item #11 should have been removed"); + + // Next poll should not find anything to expire. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(120).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(4).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); + + results.TryGetRecordedCompletion().Should().BeFalse(); + } + + [Fact(Skip = "Existing defect, very minor defect, items defined to never expire actually do, at DateTimeOffset.MaxValue")] + public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() + { + using var source = new TestSourceList(); + + var scheduler = CreateTestScheduler(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + var item2 = new Item() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; + var item3 = new Item() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(30) }; + var item4 = new Item() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(40) }; + var item5 = new Item() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(50) }; + source.AddRange(new[] { item1, item2, item3, item4, item5 }); + scheduler.AdvanceBy(1); + + // Additional expirations at 20ms. + var item6 = new Item() { Id = 6, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + var item7 = new Item() { Id = 7, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; + source.AddRange(new[] { item6, item7 }); + scheduler.AdvanceBy(1); + + // Out-of-order expiration + var item8 = new Item() { Id = 8, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15)}; + source.Add(item8); + scheduler.AdvanceBy(1); + + // Non-expiring item + var item9 = new Item() { Id = 9 }; + source.Add(item9); + scheduler.AdvanceBy(1); + + // Replacement changing lifetime. + var item10 = new Item() { Id = 10, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(45) }; + source.Replace(item4, item10); + scheduler.AdvanceBy(1); + + // Replacement not-affecting lifetime. + var item11 = new Item() { Id = 11, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(50) }; + source.Replace(item5, item11); + scheduler.AdvanceBy(1); + + // Move should not affect scheduled expiration. + item3.Expiration = DateTimeOffset.FromUnixTimeMilliseconds(55); + source.Move(2, 3); + scheduler.AdvanceBy(1); + + + // Verify initial state, after all emissions + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item1, item2, item10, item3, item11, item6, item7, item8, item9 }, options => options.WithStrictOrdering(), "9 items were added, 2 were replaced, and 1 was moved"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1 }, "item #1 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item2, item10, item3, item11, item6, item7, item8, item9 }, options => options.WithStrictOrdering(), "item #1 should have been removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item8 }, "item #8 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item2, item10, item3, item11, item6, item7, item9 }, options => options.WithStrictOrdering(), "item #8 should have expired"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item2, item6, item7 }, "items #2, #6, and #7 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item10, item3, item11, item9 }, options => options.WithStrictOrdering(), "items #2, #6, and #7 should have been removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }, "item #3 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item10, item11, item9 }, options => options.WithStrictOrdering(), "item #3 should have been removed"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(4).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item10, item11, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(4).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(4).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }, "item #10 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item11, item9 }, options => options.WithStrictOrdering(), "item #10 should have expired"); + + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(50).Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(5).Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().Skip(5).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }, "item #11 should have expired"); + source.Items.Should().BeEquivalentTo(new[] { item9 }, options => options.WithStrictOrdering(), "item #11 should have expired"); + + scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Skip(6).Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); + + results.TryGetRecordedCompletion().Should().BeFalse(); + } + + // Covers https://github.com/reactivemarbles/DynamicData/issues/716 + [Fact(Skip = "Existing defect, removals are skipped when scheduler invokes early")] + public void SchedulerIsInaccurate_RemovalsAreNotSkipped() { - TimeSpan? RemoveFunc(Person t) + using var source = new TestSourceList(); + + var scheduler = new FakeScheduler() { - if (t.Age <= 40) - { - return TimeSpan.FromSeconds(5); - } + Now = DateTimeOffset.FromUnixTimeMilliseconds(0) + }; - if (t.Age <= 80) - { - return TimeSpan.FromSeconds(7); - } + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.Add(item1); + + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + source.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); - return null; + // Simulate the scheduler invoking all actions 1ms early. + while(scheduler.ScheduledActions.Count is not 0) + { + if (scheduler.ScheduledActions[0].DueTime is DateTimeOffset dueTime) + scheduler.Now = dueTime - TimeSpan.FromMilliseconds(1); + + scheduler.ScheduledActions[0].Invoke(); + scheduler.ScheduledActions.RemoveAt(0); } - const int size = 100; - var items = Enumerable.Range(1, size).Select(i => new Person($"Name.{i}", i)).ToArray(); - _source.AddRange(items); + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); + results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1 }, "item #1 should have expired"); + source.Items.Should().BeEmpty("item #1 should have been removed"); + + results.TryGetRecordedCompletion().Should().BeFalse(); + } + + [Fact(Skip = "Existing defect, completion is not propagated from the source")] + public void SourceCompletes_CompletionIsPropagated() + { + using var source = new TestSourceList(); + + var scheduler = CreateTestScheduler(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + source.Add(new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }); + scheduler.AdvanceBy(1); + + source.Complete(); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.TryGetRecordedCompletion().Should().BeTrue(); - var remover = _source.ExpireAfter(RemoveFunc, _scheduler).Subscribe(); - _scheduler.AdvanceBy(TimeSpan.FromMilliseconds(5010).Ticks); + // Ensure that the operator does not attept to continue removing items. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - _source.Count.Should().Be(60, "40 items should have been removed from the cache"); + results.EnumerateInvalidNotifications().Should().BeEmpty(); + } + + [Fact(Skip = "Existing defect, completion is not propagated from the source")] + public void SourceCompletesImmediately_CompletionIsPropagated() + { + using var source = new TestSourceList(); + + var scheduler = CreateTestScheduler(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.Add(item1); + scheduler.AdvanceBy(1); + + source.Complete(); - _scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks); - _source.Count.Should().Be(20, "80 items should have been removed from the cache"); + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); - remover.Dispose(); + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.TryGetRecordedCompletion().Should().BeTrue(); + source.Items.Should().BeEquivalentTo(new[] { item1 }, "no changes should have occurred"); + + results.EnumerateInvalidNotifications().Should().BeEmpty(); } - public void Dispose() + [Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, operator does not use safe subscriptions")] + public void SourceErrors_ErrorIsPropagated() { - _results.Dispose(); - _source.Dispose(); + using var source = new TestSourceList(); + + var scheduler = CreateTestScheduler(); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + source.Add(new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }); + scheduler.AdvanceBy(1); + + var error = new Exception("This is a test"); + source.SetError(error); + + results.TryGetRecordedError().Should().Be(error, "an error was published"); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.TryGetRecordedCompletion().Should().BeFalse(); + + // Ensure that the operator does not attept to continue removing items. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.EnumerateInvalidNotifications().Should().BeEmpty(); + } + + [Fact(Skip = "Existing defect, immediately-occuring error is not propagated")] + public void SourceErrorsImmediately_ErrorIsPropagated() + { + using var source = new TestSourceList(); + + var scheduler = CreateTestScheduler(); + + var item1 = new Item() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; + source.Add(item1); + scheduler.AdvanceBy(1); + + var error = new Exception("This is a test"); + source.SetError(error); + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + results.TryGetRecordedError().Should().Be(error, "an error was published"); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.TryGetRecordedCompletion().Should().BeFalse(); + source.Items.Should().BeEquivalentTo(new[] { item1 }, "no changes should have occurred"); + + // Ensure that the operator does not attept to continue removing items. + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); + + results.EnumerateInvalidNotifications().Should().BeEmpty(); } [Fact] - public void ExpireIsCancelledWhenUpdated() + public void SourceIsNull_ThrowsException() + => FluentActions.Invoking(() => ObservableListEx.ExpireAfter( + source: (null as ISourceList)!, + timeSelector: static _ => default, + pollingInterval: null)) + .Should().Throw(); + + [Fact(Skip = "Existing defect, operator does not properly handle items with a null timeout, when using a real scheduler, it passes a TimeSpan to the scheduler that is outside of the supported range")] + public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe() + { + using var source = new TestSourceList(); + + var scheduler = ThreadPoolScheduler.Instance; + + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + var maxExpiration = PerformStressEdits( + source: source, + scheduler: scheduler, + stressCount: 10_000, + minItemLifetime: TimeSpan.FromMilliseconds(10), + maxItemLifetime: TimeSpan.FromMilliseconds(50), + maxChangeCount: 10, + maxRangeSize: 10); + + await Observable.Timer(maxExpiration + TimeSpan.FromMilliseconds(100), scheduler); + + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().SelectMany(static removals => removals).Should().AllSatisfy(static item => item.Expiration.Should().NotBeNull("only items with an expiration should have expired")); + results.TryGetRecordedCompletion().Should().BeFalse(); + source.Items.Should().AllSatisfy(item => item.Expiration.Should().BeNull("all items with an expiration should have expired")); + } + + [Fact(Skip = "Existing defect, deadlocks")] + public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe() { - var remover = _source.ExpireAfter(p => TimeSpan.FromMilliseconds(100), _scheduler).Subscribe(); + using var source = new TestSourceList(); + + var scheduler = ThreadPoolScheduler.Instance; - var p1 = new Person("Name1", 20); - var p2 = new Person("Name1", 21); + using var subscription = source + .ExpireAfter( + timeSelector: CreateTimeSelector(scheduler), + pollingInterval: TimeSpan.FromMilliseconds(10), + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); - _source.Add(p1); + var maxExpiration = PerformStressEdits( + source: source, + scheduler: scheduler, + stressCount: 10_000, + minItemLifetime: TimeSpan.FromMilliseconds(10), + maxItemLifetime: TimeSpan.FromMilliseconds(50), + maxChangeCount: 10, + maxRangeSize: 10); - _source.Replace(p1, p2); + await Observable.Timer(maxExpiration + TimeSpan.FromMilliseconds(100), scheduler); - _scheduler.AdvanceBy(TimeSpan.FromMilliseconds(200).Ticks); - remover.Dispose(); - _results.Data.Count.Should().Be(0, "Should be no data in the cache"); - _results.Messages.Count.Should().Be(3, "Should be 3 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 1 add in the first message"); - _results.Messages[1].Replaced.Should().Be(1, "Should be 1 update in the second message"); - _results.Messages[2].Removes.Should().Be(1, "Should be 1 remove in the 3rd message"); + results.TryGetRecordedError().Should().BeNull(); + results.EnumerateRecordedValues().SelectMany(static removals => removals).Should().AllSatisfy(item => item.Expiration.Should().NotBeNull("only items with an expiration should have expired")); + results.TryGetRecordedCompletion().Should().BeFalse(); + source.Items.Should().AllSatisfy(item => item.Expiration.Should().BeNull("all items with an expiration should have expired")); } [Fact] - public void ItemAddedIsExpired() + public void TimeSelectorIsNull_ThrowsException() + => FluentActions.Invoking(() => new TestSourceList().ExpireAfter( + timeSelector: null!, + pollingInterval: null)) + .Should().Throw(); + + [Fact(Skip = "Exsiting defect, errors are re-thrown instead of propagated, user code is not protected")] + public void TimeSelectorThrows_ThrowsException() { - var remover = _source.ExpireAfter(p => TimeSpan.FromMilliseconds(100), _scheduler).Subscribe(); + using var source = new TestSourceList(); + + var scheduler = CreateTestScheduler(); + + var error = new Exception("This is a test."); + + using var subscription = source + .ExpireAfter( + timeSelector: _ => throw error, + scheduler: scheduler) + .ValidateSynchronization() + .RecordNotifications(out var results, scheduler); + + source.Add(new Item() { Id = 1 }); + scheduler.AdvanceBy(1); + + results.TryGetRecordedError().Should().Be(error); + results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.TryGetRecordedCompletion().Should().BeFalse(); + + results.EnumerateInvalidNotifications().Should().BeEmpty(); + } + + private static TestScheduler CreateTestScheduler() + { + var scheduler = new TestScheduler(); + scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(0).Ticks); + + return scheduler; + } - _source.Add(new Person("Name1", 10)); + private static Func CreateTimeSelector(IScheduler scheduler) + => item => item.Expiration - scheduler.Now; - _scheduler.AdvanceBy(TimeSpan.FromMilliseconds(200).Ticks); - remover.Dispose(); + private static DateTimeOffset PerformStressEdits( + ISourceList source, + IScheduler scheduler, + int stressCount, + TimeSpan minItemLifetime, + TimeSpan maxItemLifetime, + int maxChangeCount, + int maxRangeSize) + { + var nextItemId = 1; + var randomizer = new Randomizer(1234567); + var maxExpiration = DateTimeOffset.MinValue; + + for (var i = 0; i < stressCount; ++i) + source.Edit(mutator => + { + var changeCount = randomizer.Int(1, maxChangeCount); + + for (var i = 0; i < changeCount; ++i) + { + var changeReason = mutator.Count switch + { + 0 => randomizer.Enum(exclude: new[] + { + ListChangeReason.Replace, + ListChangeReason.Remove, + ListChangeReason.RemoveRange, + ListChangeReason.Refresh, + ListChangeReason.Moved, + ListChangeReason.Clear + }), + 1 => randomizer.Enum(exclude: new[] + { + ListChangeReason.Refresh, + ListChangeReason.Moved + }), + _ => randomizer.Enum(exclude: ListChangeReason.Refresh) + }; + + switch (changeReason) + { + case ListChangeReason.Add: + mutator.Add(new Item() + { + Id = nextItemId++, + Expiration = GenerateExpiration() + }); + break; + + case ListChangeReason.AddRange: + mutator.AddRange(Enumerable + .Range(0, randomizer.Int(1, maxRangeSize)) + .Select(_ => new Item() + { + Id = nextItemId++, + Expiration = GenerateExpiration() + }) + .ToArray()); + break; + + case ListChangeReason.Replace: + mutator.Replace( + original: randomizer.ListItem(mutator), + replaceWith: new Item() + { + Id = nextItemId++, + Expiration = GenerateExpiration() + }); + break; + + case ListChangeReason.Remove: + mutator.RemoveAt(randomizer.Int(0, mutator.Count - 1)); + break; + + case ListChangeReason.RemoveRange: + var removeCount = randomizer.Int(1, Math.Min(maxRangeSize, mutator.Count)); + mutator.RemoveRange( + index: randomizer.Int(0, mutator.Count - removeCount), + count: removeCount); + break; + + case ListChangeReason.Moved: + int originalIndex; + int destinationIndex; + + do + { + originalIndex = randomizer.Int(0, mutator.Count - 1); + destinationIndex = randomizer.Int(0, mutator.Count - 1); + } while (originalIndex != destinationIndex); + + mutator.Move(originalIndex, destinationIndex); + break; + + case ListChangeReason.Clear: + mutator.Clear(); + break; + } + } + }); + + return maxExpiration; + + DateTimeOffset? GenerateExpiration() + { + if (randomizer.Bool()) + return null; + + var expiration = scheduler.Now + randomizer.TimeSpan(minItemLifetime, maxItemLifetime); + if (expiration > maxExpiration) + maxExpiration = expiration; + + return expiration; + } + } + + private class Item + { + public required int Id { get; init; } - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[0].Adds.Should().Be(1, "Should be 1 adds in the first update"); - _results.Messages[1].Removes.Should().Be(1, "Should be 1 removes in the second update"); + public DateTimeOffset? Expiration { get; set; } } } diff --git a/src/DynamicData.Tests/Utilities/ObservableExtensions.cs b/src/DynamicData.Tests/Utilities/ObservableExtensions.cs index b392f163a..c67aaedcb 100644 --- a/src/DynamicData.Tests/Utilities/ObservableExtensions.cs +++ b/src/DynamicData.Tests/Utilities/ObservableExtensions.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Reactive; +using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; @@ -48,6 +49,15 @@ public static IObservable Parallelize(this IObservable source, int c public static IObservable Parallelize(this IObservable source, int count, int parallel) => Observable.Merge(Distribute(count, parallel).Select(n => source.Take(n))); + public static IDisposable RecordNotifications( + this IObservable source, + out TestableObserver observer, + IScheduler? scheduler = null) + { + observer = TestableObserver.Create(scheduler); + + return source.Subscribe(observer); + } public static IObservable ValidateSynchronization(this IObservable source) // Using Raw observable and observer classes to bypass normal RX safeguards, which prevent out-of-sequence notifications. // This allows the operator to be combined with TestableObserver, for correctness-testing of operators.