Skip to content

Commit

Permalink
Avoid unexpect fire when ticking for notBefore
Browse files Browse the repository at this point in the history
  • Loading branch information
suraciii committed Sep 1, 2023
1 parent d109451 commit 9f7f6a4
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 45 deletions.
42 changes: 23 additions & 19 deletions src/Fabron.Core/Schedulers/CronScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using CommunityToolkit.Diagnostics;
using Cronos;
using Fabron.Dispatching;
using Fabron.Models;
using Fabron.Stores;
Expand Down Expand Up @@ -68,11 +69,19 @@ private Task StartTicker()
Guard.IsNotNull(_state);
var utcNow = _clock.UtcNow;
var notBefore = _state.Spec.NotBefore;
return notBefore switch
if (_state.Spec.NotAfter is { } notAfter && utcNow > notAfter)
{
not null when notBefore.Value > utcNow => Tick(notBefore.Value),
_ => Tick(default)
};
return Task.CompletedTask;
}

var cron = CronExpression.Parse(_state.Spec.Schedule, _options.CronFormat);
var nextTick = cron.GetNextOccurrence(notBefore ?? utcNow, _options.TimeZone, inclusive: true);
if (nextTick == null)
{
// could this happen?
return Task.CompletedTask;
}
return TickAfter(utcNow, nextTick.Value);
}

public Task Tick()
Expand All @@ -87,12 +96,11 @@ internal override async Task Tick(DateTimeOffset expectedTickTime)
var now = _clock.UtcNow;
TickerLog.Ticking(_logger, _key, now, expectedTickTime);

var shouldDispatchForCurrentTick = expectedTickTime != default;
if (shouldDispatchForCurrentTick && now.Subtract(expectedTickTime) > TimeSpan.FromMinutes(5))
if (now.Subtract(expectedTickTime) > TimeSpan.FromMinutes(2))
{
TickerLog.UnexpectedTick(_logger, _key, expectedTickTime, "Missed");
shouldDispatchForCurrentTick = false;
}

if (_state is null || _state.Metadata.DeletionTimestamp is not null)
{
TickerLog.UnexpectedTick(_logger, _key, expectedTickTime, "NotRegistered");
Expand All @@ -105,34 +113,30 @@ internal override async Task Tick(DateTimeOffset expectedTickTime)
await StopTicker();
return;
}
if (_state.Spec.NotBefore.HasValue && now < _state.Spec.NotBefore.Value)

if (_state.Spec.NotBefore is { } notBefore && now < notBefore)
{
TickerLog.UnexpectedTick(_logger, _key, expectedTickTime, "NotStarted");
await TickAfter(now, _state.Spec.NotBefore.Value);
await StartTicker();
return;
}

var cron = Cronos.CronExpression.Parse(_state.Spec.Schedule, _options.CronFormat);

var from = shouldDispatchForCurrentTick ? expectedTickTime : now;
var from = expectedTickTime;
var to = from.AddMinutes(2);
if (to > _state.Spec.NotAfter)
{
to = _state.Spec.NotAfter.Value;
}

var schedules = cron.GetOccurrences(from, to, _options.TimeZone, fromInclusive: false, toInclusive: false);
if (shouldDispatchForCurrentTick)
{
Dispatch(expectedTickTime);
}
var cron = CronExpression.Parse(_state.Spec.Schedule, _options.CronFormat);
var schedules = cron.GetOccurrences(from, to, _options.TimeZone, fromInclusive: true, toInclusive: false);
foreach (var schedule in schedules)
{
Dispatch(schedule);
}

from = to;
var nextTick = cron.GetNextOccurrence(from, _options.TimeZone, inclusive: true);
var next = to;
var nextTick = cron.GetNextOccurrence(next, _options.TimeZone, inclusive: true);
if (!nextTick.HasValue || (_state.Spec.NotAfter.HasValue && nextTick.Value > _state.Spec.NotAfter.Value))
{
// no more next tick
Expand Down
2 changes: 1 addition & 1 deletion src/Fabron.Core/Schedulers/SchedulerGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,6 @@ async Task IRemindable.ReceiveReminder(string reminderName, TickStatus status)
return;
}
}
await Tick(_state?.Status.NextTick ?? default);
await Tick(_state?.Status.NextTick ?? _clock.UtcNow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,25 @@ await scheduler.Schedule(


[Fact]
public async Task ShouldIgnoreMissedTick()
public async Task ShouldNotIgnoreMissedTick()
{
var (scheduler, _, reminderRegistry, clock, _, _) = PrepareGrain();
var (scheduler, timerRegistry, reminderRegistry, clock, _, _) = PrepareGrain();
await (scheduler as IGrainBase).OnActivateAsync(default);
clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 0, TimeSpan.Zero);
await scheduler.Schedule(
JsonSerializer.Serialize(new { foo = "bar" }),
new CronTimerSpec(Schedule: "0 0 0 * * *"),
new CronTimerSpec(Schedule: "0 0 0 2 * *"),
null,
null);

clock.UtcNow = new DateTimeOffset(2021, 1, 1, 0, 0, 0, TimeSpan.Zero);
var tickTime = new DateTimeOffset(2020, 1, 2, 0, 0, 0, TimeSpan.Zero);
clock.UtcNow = tickTime.AddMinutes(5);

await ((IRemindable)scheduler).ReceiveReminder(Names.TickerReminder, new TickStatus(new DateTimeOffset(2020, 1, 1, 1, 0, 0, TimeSpan.Zero).DateTime, TimeSpan.FromMinutes(2), clock.UtcNow.AddMilliseconds(100).DateTime));
await ((IRemindable)scheduler).ReceiveReminder(Names.TickerReminder, new TickStatus(tickTime.UtcDateTime, TimeSpan.FromMinutes(2), tickTime.UtcDateTime));

timerRegistry.Timers.Should().HaveCount(1);
reminderRegistry.Reminders.Should().HaveCount(1);
reminderRegistry.Reminders.Single().Value.DueTime.Should().Be(new DateTimeOffset(2021, 1, 2, 0, 0, 0, TimeSpan.Zero) - clock.UtcNow);
reminderRegistry.Reminders.Single().Value.DueTime.Should().Be(new DateTimeOffset(2020, 2, 2, 0, 0, 0, TimeSpan.Zero) - clock.UtcNow);
}

[Fact]
Expand Down Expand Up @@ -150,56 +152,60 @@ public async Task ShouldNotScheduleLaterThanNotAfterTime()
var notAfter = clock.UtcNow.AddDays(30).AddSeconds(15);
await scheduler.Schedule(
JsonSerializer.Serialize(new { foo = "bar" }),
new CronTimerSpec(Schedule: "*/10 * * * * *", null, notAfter),
new CronTimerSpec(Schedule: "*/10 * * * * *", clock.UtcNow.AddDays(30), notAfter),
null,
null);

clock.UtcNow = clock.UtcNow.AddDays(30);

clock.UtcNow = clock.UtcNow.AddDays(30).AddMilliseconds(100);

await ((IRemindable)scheduler).ReceiveReminder(Names.TickerReminder, new TickStatus(new DateTimeOffset(2020, 1, 1, 1, 0, 0, TimeSpan.Zero).DateTime, TimeSpan.FromMinutes(2), clock.UtcNow.AddMilliseconds(100).DateTime));

reminderRegistry.Reminders.Should().HaveCount(0);
}

[Fact]
public async Task ShouldScheduleTillNotAfter()
public async Task ShouldNotDispatchDuplicatedTicks()
{
var (scheduler, timerRegistry, reminderRegistry, clock, _, dispatcher) = PrepareGrain();
await (scheduler as IGrainBase).OnActivateAsync(default);
clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 0, TimeSpan.Zero);
await scheduler.Schedule(
JsonSerializer.Serialize(new { foo = "bar" }),
new CronTimerSpec(Schedule: "0 1 0 * * *", null, clock.UtcNow.AddMinutes(2)),
new CronTimerSpec(Schedule: "0 */3 * * * *"),
null,
null);

clock.UtcNow = DateTimeOffset.Parse("2020-01-01T00:03:00.0081234+00:00");
await ((IRemindable)scheduler).ReceiveReminder(
Names.TickerReminder,
new TickStatus(
DateTime.Parse("2020-01-01T00:02:59.9998105+00:00"),
TimeSpan.FromMinutes(2),
DateTime.Parse("2020-01-01T00:02:59.9998105+00:00")
));
timerRegistry.Timers.Count.Should().Be(1);
clock.UtcNow = clock.UtcNow.AddMinutes(2).AddMilliseconds(100);
await timerRegistry.Timers[0].Trigger();

dispatcher.Envelops.Count.Should().Be(1);
}

[Fact]
public async Task ShouldNotDispatchDuplicatedTicks()
public async Task ShouldScheduleAtFirstTickWhenNotBeforeSet()
{
var (scheduler, timerRegistry, reminderRegistry, clock, _, dispatcher) = PrepareGrain();
await (scheduler as IGrainBase).OnActivateAsync(default);
clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 0, TimeSpan.Zero);
clock.UtcNow = DateTimeOffset.Parse("2020-01-01T00:00:00.000+00:00");
var notBefore = clock.UtcNow.AddDays(29).AddHours(10);
clock.UtcNow = clock.UtcNow.AddMilliseconds(100);
await scheduler.Schedule(
JsonSerializer.Serialize(new { foo = "bar" }),
new CronTimerSpec(Schedule: "0 */3 * * * *"),
new CronTimerSpec(Schedule: "0 0 0 * * *", notBefore),
null,
null);

clock.UtcNow = DateTimeOffset.Parse("2020-01-01T00:03:00.0081234+00:00");
await ((IRemindable)scheduler).ReceiveReminder(
Names.TickerReminder,
new TickStatus(
DateTime.Parse("2020-01-01T00:02:59.9998105+00:00"),
TimeSpan.FromMinutes(2),
DateTime.Parse("2020-01-01T00:02:59.9998105+00:00")
));
var tickReminder = reminderRegistry.Reminders.Single().Value;
tickReminder.DueTime.Should().Be(DateTimeOffset.Parse("2020-01-31T00:00:00.000+00:00") - clock.UtcNow);

clock.UtcNow = DateTimeOffset.Parse("2020-01-31T00:00:00.000+00:00").AddMilliseconds(100);
await tickReminder.FireFor(scheduler, DateTimeOffset.Parse("2020-01-31T00:00:00.000+00:00"));
timerRegistry.Timers.Count.Should().Be(1);
}
}
5 changes: 5 additions & 0 deletions test/Fabron.Core.Test/SchedulerTests/FakeReminderRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,10 @@ public FakeGrainReminder(string reminderName, TimeSpan dueTime, TimeSpan period)
public TimeSpan Period { get; }

public string ReminderName { get; }

public async Task FireFor(IRemindable grain, DateTimeOffset time)
{
await grain.ReceiveReminder(ReminderName, new TickStatus(time.UtcDateTime, Period, time.UtcDateTime));
}
}
}

0 comments on commit 9f7f6a4

Please sign in to comment.