From 20a7170684f75c19b051bd0ef7b2ad129a5c38f1 Mon Sep 17 00:00:00 2001 From: Sun Zhongfeng Date: Sun, 8 Oct 2023 18:06:40 +0800 Subject: [PATCH] should not miss late fires --- src/Fabron.Core/Schedulers/CronScheduler.cs | 25 +++++++---- .../Schedulers/PeriodicScheduler.cs | 26 +++++++---- src/Fabron.Core/Schedulers/SchedulerGrain.cs | 2 +- src/Fabron.Core/Schedulers/TickerLog.cs | 2 +- .../CronTimerTickingTests.cs | 25 ++++++++++- .../PeriodicTimerTickingTests.cs | 43 ++++++++++++++++--- 6 files changed, 97 insertions(+), 26 deletions(-) diff --git a/src/Fabron.Core/Schedulers/CronScheduler.cs b/src/Fabron.Core/Schedulers/CronScheduler.cs index 6f12fda..88c8876 100644 --- a/src/Fabron.Core/Schedulers/CronScheduler.cs +++ b/src/Fabron.Core/Schedulers/CronScheduler.cs @@ -121,18 +121,25 @@ internal override async Task Tick(DateTimeOffset expectedTickTime) } var from = expectedTickTime; - var to = from.AddMinutes(2); - if (to > _state.Spec.NotAfter) - { - to = _state.Spec.NotAfter.Value; - } + var to = from; var cron = CronExpression.Parse(_state.Spec.Schedule, _options.CronFormat); - var schedules = cron.GetOccurrences(from, to, _options.TimeZone, fromInclusive: true, toInclusive: false); - foreach (var schedule in schedules) + + do { - Dispatch(schedule); - } + to = from.AddMinutes(2); + if (to > _state.Spec.NotAfter) + { + to = _state.Spec.NotAfter.Value; + } + + var schedules = cron.GetOccurrences(from, to, _options.TimeZone, fromInclusive: true, toInclusive: false); + foreach (var schedule in schedules) + { + Dispatch(schedule); + } + from = to; + } while (from < now); var next = to; var nextTick = cron.GetNextOccurrence(next, _options.TimeZone, inclusive: true); diff --git a/src/Fabron.Core/Schedulers/PeriodicScheduler.cs b/src/Fabron.Core/Schedulers/PeriodicScheduler.cs index 75d0790..f18e4ba 100644 --- a/src/Fabron.Core/Schedulers/PeriodicScheduler.cs +++ b/src/Fabron.Core/Schedulers/PeriodicScheduler.cs @@ -69,7 +69,7 @@ private Task StartTicker() return notBefore switch { not null when notBefore.Value > utcNow => Tick(notBefore.Value), - _ => Tick(default) + _ => Tick(utcNow) }; } @@ -104,12 +104,20 @@ internal override async Task Tick(DateTimeOffset expectedTickTime) return; } - var to = now.AddMinutes(1); - if (_state.Spec.NotAfter.HasValue && to > _state.Spec.NotAfter) + var from = expectedTickTime; + var to = from; + DateTimeOffset nextTick; + do { - to = _state.Spec.NotAfter.Value; - } - var nextTick = Dispatch(now, to); + to = from.AddMinutes(1); + if (to > _state.Spec.NotAfter) + { + to = _state.Spec.NotAfter.Value; + } + nextTick = Dispatch(from, to); + from = nextTick; + } while (from < now); + if (_state.Spec.NotAfter.HasValue && nextTick > _state.Spec.NotAfter.Value) { // no more next tick @@ -119,17 +127,17 @@ internal override async Task Tick(DateTimeOffset expectedTickTime) await TickAfter(now, nextTick); } - private DateTimeOffset Dispatch(DateTimeOffset now, DateTimeOffset to) + private DateTimeOffset Dispatch(DateTimeOffset from, DateTimeOffset to) { Guard.IsNotNull(_state, nameof(_state)); - var nextTick = now; + var nextTick = from; var dueTime = TimeSpan.Zero; while (nextTick < to) { var envelop = _state.ToEnvelop(nextTick); FireAfter(envelop, dueTime); dueTime = dueTime.Add(_state.Spec.Period); - nextTick = now.Add(dueTime); + nextTick = from.Add(dueTime); } return nextTick; } diff --git a/src/Fabron.Core/Schedulers/SchedulerGrain.cs b/src/Fabron.Core/Schedulers/SchedulerGrain.cs index 41dfa98..4fa5113 100644 --- a/src/Fabron.Core/Schedulers/SchedulerGrain.cs +++ b/src/Fabron.Core/Schedulers/SchedulerGrain.cs @@ -47,7 +47,7 @@ public SchedulerGrain( protected string? _eTag = default!; private IGrainReminder? _tickReminder; - protected async Task LoadStateAsync() + internal async Task LoadStateAsync() { //using var _ = Telemetry.ActivitySource.StartActivity("Load State"); var entry = await _store.GetAsync(_key); diff --git a/src/Fabron.Core/Schedulers/TickerLog.cs b/src/Fabron.Core/Schedulers/TickerLog.cs index 6594819..5e7d48a 100644 --- a/src/Fabron.Core/Schedulers/TickerLog.cs +++ b/src/Fabron.Core/Schedulers/TickerLog.cs @@ -38,7 +38,7 @@ public static partial class TickerLog [LoggerMessage( Level = LogLevel.Debug, - Message = "[{key}]: Dispatching at {now:o}, expeced: {expected:o}")] + Message = "[{key}]: Dispatching at {now:o}, expected: {expected:o}")] public static partial void Dispatching(ILogger logger, string key, DateTimeOffset now, DateTimeOffset expected); [LoggerMessage( diff --git a/test/Fabron.Core.Test/SchedulerTests/CronSchedulerTests/CronTimerTickingTests.cs b/test/Fabron.Core.Test/SchedulerTests/CronSchedulerTests/CronTimerTickingTests.cs index 594ea3a..256e882 100644 --- a/test/Fabron.Core.Test/SchedulerTests/CronSchedulerTests/CronTimerTickingTests.cs +++ b/test/Fabron.Core.Test/SchedulerTests/CronSchedulerTests/CronTimerTickingTests.cs @@ -170,7 +170,7 @@ public async Task ShouldNotDispatchDuplicatedTicks() { var (scheduler, timerRegistry, reminderRegistry, clock, _, dispatcher) = PrepareGrain(); await (scheduler as IGrainBase).OnActivateAsync(default); - clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 0, TimeSpan.Zero); + clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 1, TimeSpan.Zero); await scheduler.Schedule( JsonSerializer.Serialize(new { foo = "bar" }), new CronTimerSpec(Schedule: "0 */3 * * * *"), @@ -252,4 +252,27 @@ await scheduler.Schedule( var fire = timerRegistry.Timers.Single(); fire.DueTime.Should().Be(TimeSpan.Zero); } + + [Fact] + public async Task ShouldNotMissLateFires() + { + var (scheduler, timerRegistry, reminderRegistry, clock, _, dispatcher) = PrepareGrain(); + await (scheduler as IGrainBase).OnActivateAsync(default); + clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 0, TimeSpan.Zero); + await scheduler.Schedule( + JsonSerializer.Serialize(new { foo = "bar" }), + new CronTimerSpec(Schedule: "* * * * * *"), + null, + null); + + clock.UtcNow = DateTimeOffset.Parse("2020-01-01T00:02:00.0081234+00:00"); + await ((IRemindable)scheduler).ReceiveReminder( + Names.TickerReminder, + new TickStatus( + DateTime.Parse("2020-01-01T00:01:59.9998105+00:00"), + TimeSpan.FromMinutes(2), + DateTime.Parse("2020-01-01T00:01:59.9998105+00:00") + )); + timerRegistry.Timers.Count.Should().Be(4 * 60); + } } diff --git a/test/Fabron.Core.Test/SchedulerTests/PeriodicTimerTests/PeriodicTimerTickingTests.cs b/test/Fabron.Core.Test/SchedulerTests/PeriodicTimerTests/PeriodicTimerTickingTests.cs index 9ea259c3..ba80d60 100644 --- a/test/Fabron.Core.Test/SchedulerTests/PeriodicTimerTests/PeriodicTimerTickingTests.cs +++ b/test/Fabron.Core.Test/SchedulerTests/PeriodicTimerTests/PeriodicTimerTickingTests.cs @@ -139,7 +139,7 @@ await scheduler.Schedule( [Fact] public async Task ShouldNotScheduleLaterThanNotAfterTime() { - var (scheduler, _, reminderRegistry, clock, _, _) = PrepareGrain(); + var (scheduler, _, reminderRegistry, clock, store, _) = PrepareGrain(); await (scheduler as IGrainBase).OnActivateAsync(default); clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 0, TimeSpan.Zero); var notAfter = clock.UtcNow.AddDays(30).AddSeconds(15); @@ -148,11 +148,20 @@ await scheduler.Schedule( new PeriodicTimerSpec(TimeSpan.FromSeconds(10), null, notAfter), null, null); - + var entry = await store.GetAsync(scheduler.GetPrimaryKeyString()); clock.UtcNow = clock.UtcNow.AddDays(30); - - await ((IRemindable)scheduler).ReceiveReminder(Names.TickerReminder, new TickStatus(new DateTimeOffset(2020, 1, 1, 1, 0, 0, TimeSpan.Zero).DateTime, TimeSpan.FromMinutes(2), clock.UtcNow.AddMilliseconds(100).DateTime)); - + entry!.State.Status.NextTick = clock.UtcNow; + await store.SetAsync(entry.State, entry.ETag); + clock.UtcNow.AddMilliseconds(50); + + await scheduler.LoadStateAsync(); + await ((IRemindable)scheduler).ReceiveReminder( + Names.TickerReminder, + new TickStatus( + DateTime.Parse("2020-01-02T23:59:59.9998105+00:00"), + TimeSpan.FromMinutes(2), + DateTime.Parse("2020-01-02T23:59:59.9998105+00:00") + )); reminderRegistry.Reminders.Should().HaveCount(0); } @@ -204,6 +213,30 @@ await scheduler.Schedule( dispatcher.Envelops.Count.Should().Be(6); } + + [Fact] + public async Task ShouldNotMissLateFires() + { + var (scheduler, timerRegistry, reminderRegistry, clock, _, dispatcher) = PrepareGrain(); + await (scheduler as IGrainBase).OnActivateAsync(default); + clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 0, TimeSpan.Zero); + await scheduler.Schedule( + JsonSerializer.Serialize(new { foo = "bar" }), + new PeriodicTimerSpec(TimeSpan.FromSeconds(5)), + null, + null); + + clock.UtcNow = DateTimeOffset.Parse("2020-01-01T00:02:00.0081234+00:00"); + await ((IRemindable)scheduler).ReceiveReminder( + Names.TickerReminder, + new TickStatus( + DateTime.Parse("2020-01-01T00:01:59.9998105+00:00"), + TimeSpan.FromMinutes(2), + DateTime.Parse("2020-01-01T00:01:59.9998105+00:00") + )); + timerRegistry.Timers.Count.Should().Be(3 * 60 / 5); + } + } internal record PeriodicFakes(