Skip to content

Commit

Permalink
should not miss late fires
Browse files Browse the repository at this point in the history
  • Loading branch information
suraciii committed Oct 8, 2023
1 parent db4f7da commit 20a7170
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 26 deletions.
25 changes: 16 additions & 9 deletions src/Fabron.Core/Schedulers/CronScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,25 @@ internal override async Task Tick(DateTimeOffset expectedTickTime)
}

var from = expectedTickTime;
var to = from.AddMinutes(2);
if (to > _state.Spec.NotAfter)
{
to = _state.Spec.NotAfter.Value;
}
var to = from;

var cron = CronExpression.Parse(_state.Spec.Schedule, _options.CronFormat);
var schedules = cron.GetOccurrences(from, to, _options.TimeZone, fromInclusive: true, toInclusive: false);
foreach (var schedule in schedules)

do
{
Dispatch(schedule);
}
to = from.AddMinutes(2);
if (to > _state.Spec.NotAfter)
{
to = _state.Spec.NotAfter.Value;
}

var schedules = cron.GetOccurrences(from, to, _options.TimeZone, fromInclusive: true, toInclusive: false);
foreach (var schedule in schedules)
{
Dispatch(schedule);
}
from = to;
} while (from < now);

var next = to;
var nextTick = cron.GetNextOccurrence(next, _options.TimeZone, inclusive: true);
Expand Down
26 changes: 17 additions & 9 deletions src/Fabron.Core/Schedulers/PeriodicScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private Task StartTicker()
return notBefore switch
{
not null when notBefore.Value > utcNow => Tick(notBefore.Value),
_ => Tick(default)
_ => Tick(utcNow)
};
}

Expand Down Expand Up @@ -104,12 +104,20 @@ internal override async Task Tick(DateTimeOffset expectedTickTime)
return;
}

var to = now.AddMinutes(1);
if (_state.Spec.NotAfter.HasValue && to > _state.Spec.NotAfter)
var from = expectedTickTime;
var to = from;
DateTimeOffset nextTick;
do
{
to = _state.Spec.NotAfter.Value;
}
var nextTick = Dispatch(now, to);
to = from.AddMinutes(1);
if (to > _state.Spec.NotAfter)
{
to = _state.Spec.NotAfter.Value;
}
nextTick = Dispatch(from, to);
from = nextTick;
} while (from < now);

if (_state.Spec.NotAfter.HasValue && nextTick > _state.Spec.NotAfter.Value)
{
// no more next tick
Expand All @@ -119,17 +127,17 @@ internal override async Task Tick(DateTimeOffset expectedTickTime)
await TickAfter(now, nextTick);
}

private DateTimeOffset Dispatch(DateTimeOffset now, DateTimeOffset to)
private DateTimeOffset Dispatch(DateTimeOffset from, DateTimeOffset to)
{
Guard.IsNotNull(_state, nameof(_state));
var nextTick = now;
var nextTick = from;
var dueTime = TimeSpan.Zero;
while (nextTick < to)
{
var envelop = _state.ToEnvelop(nextTick);
FireAfter(envelop, dueTime);
dueTime = dueTime.Add(_state.Spec.Period);
nextTick = now.Add(dueTime);
nextTick = from.Add(dueTime);
}
return nextTick;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Fabron.Core/Schedulers/SchedulerGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public SchedulerGrain(
protected string? _eTag = default!;
private IGrainReminder? _tickReminder;

protected async Task LoadStateAsync()
internal async Task LoadStateAsync()
{
//using var _ = Telemetry.ActivitySource.StartActivity("Load State");
var entry = await _store.GetAsync(_key);
Expand Down
2 changes: 1 addition & 1 deletion src/Fabron.Core/Schedulers/TickerLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static partial class TickerLog

[LoggerMessage(
Level = LogLevel.Debug,
Message = "[{key}]: Dispatching at {now:o}, expeced: {expected:o}")]
Message = "[{key}]: Dispatching at {now:o}, expected: {expected:o}")]
public static partial void Dispatching(ILogger logger, string key, DateTimeOffset now, DateTimeOffset expected);

[LoggerMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public async Task ShouldNotDispatchDuplicatedTicks()
{
var (scheduler, timerRegistry, reminderRegistry, clock, _, dispatcher) = PrepareGrain();
await (scheduler as IGrainBase).OnActivateAsync(default);
clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 0, TimeSpan.Zero);
clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 1, TimeSpan.Zero);
await scheduler.Schedule(
JsonSerializer.Serialize(new { foo = "bar" }),
new CronTimerSpec(Schedule: "0 */3 * * * *"),
Expand Down Expand Up @@ -252,4 +252,27 @@ await scheduler.Schedule(
var fire = timerRegistry.Timers.Single();
fire.DueTime.Should().Be(TimeSpan.Zero);
}

[Fact]
public async Task ShouldNotMissLateFires()
{
var (scheduler, timerRegistry, reminderRegistry, clock, _, dispatcher) = PrepareGrain();
await (scheduler as IGrainBase).OnActivateAsync(default);
clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 0, TimeSpan.Zero);
await scheduler.Schedule(
JsonSerializer.Serialize(new { foo = "bar" }),
new CronTimerSpec(Schedule: "* * * * * *"),
null,
null);

clock.UtcNow = DateTimeOffset.Parse("2020-01-01T00:02:00.0081234+00:00");
await ((IRemindable)scheduler).ReceiveReminder(
Names.TickerReminder,
new TickStatus(
DateTime.Parse("2020-01-01T00:01:59.9998105+00:00"),
TimeSpan.FromMinutes(2),
DateTime.Parse("2020-01-01T00:01:59.9998105+00:00")
));
timerRegistry.Timers.Count.Should().Be(4 * 60);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ await scheduler.Schedule(
[Fact]
public async Task ShouldNotScheduleLaterThanNotAfterTime()
{
var (scheduler, _, reminderRegistry, clock, _, _) = PrepareGrain();
var (scheduler, _, reminderRegistry, clock, store, _) = PrepareGrain();
await (scheduler as IGrainBase).OnActivateAsync(default);
clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 0, TimeSpan.Zero);
var notAfter = clock.UtcNow.AddDays(30).AddSeconds(15);
Expand All @@ -148,11 +148,20 @@ await scheduler.Schedule(
new PeriodicTimerSpec(TimeSpan.FromSeconds(10), null, notAfter),
null,
null);

var entry = await store.GetAsync(scheduler.GetPrimaryKeyString());
clock.UtcNow = clock.UtcNow.AddDays(30);

await ((IRemindable)scheduler).ReceiveReminder(Names.TickerReminder, new TickStatus(new DateTimeOffset(2020, 1, 1, 1, 0, 0, TimeSpan.Zero).DateTime, TimeSpan.FromMinutes(2), clock.UtcNow.AddMilliseconds(100).DateTime));

entry!.State.Status.NextTick = clock.UtcNow;
await store.SetAsync(entry.State, entry.ETag);
clock.UtcNow.AddMilliseconds(50);

await scheduler.LoadStateAsync();
await ((IRemindable)scheduler).ReceiveReminder(
Names.TickerReminder,
new TickStatus(
DateTime.Parse("2020-01-02T23:59:59.9998105+00:00"),
TimeSpan.FromMinutes(2),
DateTime.Parse("2020-01-02T23:59:59.9998105+00:00")
));
reminderRegistry.Reminders.Should().HaveCount(0);
}

Expand Down Expand Up @@ -204,6 +213,30 @@ await scheduler.Schedule(

dispatcher.Envelops.Count.Should().Be(6);
}

[Fact]
public async Task ShouldNotMissLateFires()
{
var (scheduler, timerRegistry, reminderRegistry, clock, _, dispatcher) = PrepareGrain();
await (scheduler as IGrainBase).OnActivateAsync(default);
clock.UtcNow = new DateTimeOffset(2020, 1, 1, 0, 0, 0, TimeSpan.Zero);
await scheduler.Schedule(
JsonSerializer.Serialize(new { foo = "bar" }),
new PeriodicTimerSpec(TimeSpan.FromSeconds(5)),
null,
null);

clock.UtcNow = DateTimeOffset.Parse("2020-01-01T00:02:00.0081234+00:00");
await ((IRemindable)scheduler).ReceiveReminder(
Names.TickerReminder,
new TickStatus(
DateTime.Parse("2020-01-01T00:01:59.9998105+00:00"),
TimeSpan.FromMinutes(2),
DateTime.Parse("2020-01-01T00:01:59.9998105+00:00")
));
timerRegistry.Timers.Count.Should().Be(3 * 60 / 5);
}

}

internal record PeriodicFakes(
Expand Down

0 comments on commit 20a7170

Please sign in to comment.