Skip to content

Commit

Permalink
feat: Startup Jobs are run before anything else is running
Browse files Browse the repository at this point in the history
  • Loading branch information
linkdotnet committed Nov 1, 2024
1 parent 5b58171 commit c6066d9
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 23 deletions.
2 changes: 2 additions & 0 deletions sample/RunOnceSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@

var app = builder.Build();

await app.UseNCronJobAsync();

await app.RunAsync();
11 changes: 10 additions & 1 deletion src/NCronJob/Configuration/Builder/NCronJobOptionBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,18 @@ public interface IStartupStage<TJob> : INotificationStage<TJob>
where TJob : class, IJob
{
/// <summary>
/// Configures the job to run once during the application startup before any other jobs.
/// Configures the job to run once before the application itself runs.
/// </summary>
/// <returns>Returns a <see cref="INotificationStage{TJob}"/> that allows adding notifications of another job.</returns>
/// <remarks>
/// If a job is marked to run at startup, it will be executed before any `IHostedService` is started. Use the <seealso cref="NCronJobExtensions.UseNCronJob"/> method to trigger the job execution.
/// In the context of ASP.NET:
/// <code>
/// await app.UseNCronJobAsync();
/// await app.RunAsync();
/// </code>
/// All startup jobs will be executed (and awaited) before the web application is started. This is particular useful for migration and cache hydration.
/// </remarks>
INotificationStage<TJob> RunAtStartup();
}

Expand Down
39 changes: 29 additions & 10 deletions src/NCronJob/NCronJobExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;

namespace NCronJob;

/// <summary>
/// Extensions for the <see cref="IServiceCollection"/> to add cron jobs.
/// Extensions for various types to use NCronJob.
/// </summary>
public static class NCronJobExtensions
{
Expand Down Expand Up @@ -42,18 +43,36 @@ public static IServiceCollection AddNCronJob(
services.TryAddSingleton<JobExecutor>();
services.TryAddSingleton<IRetryHandler, RetryHandler>();
services.TryAddSingleton<IInstantJobRegistry, InstantJobRegistry>();
services.TryAddSingleton<IRuntimeJobRegistry, RuntimeJobRegistry>((sp) =>
{
return new RuntimeJobRegistry(
services,
jobRegistry,
sp.GetRequiredService<JobWorker>(),
sp.GetRequiredService<JobQueueManager>(),
sp.GetRequiredService<ConcurrencySettings>());
});
services.TryAddSingleton<IRuntimeJobRegistry, RuntimeJobRegistry>(sp => new RuntimeJobRegistry(
services,
jobRegistry,
sp.GetRequiredService<JobWorker>(),
sp.GetRequiredService<JobQueueManager>(),
sp.GetRequiredService<ConcurrencySettings>()));
services.TryAddSingleton(TimeProvider.System);
services.TryAddSingleton<StartupJobManager>();

return services;
}

/// <summary>
/// Configures the host to use NCronJob. This will also start any given startup jobs and their dependencies.
/// </summary>
/// <param name="host">The host.</param>
public static IHost UseNCronJob(this IHost host) => UseNCronJobAsync(host).ConfigureAwait(false).GetAwaiter().GetResult();

/// <summary>
/// Configures the host to use NCronJob. This will also start any given startup jobs and their dependencies.
/// </summary>
/// <param name="host">The host.</param>
public static async Task<IHost> UseNCronJobAsync(this IHost host)
{
ArgumentNullException.ThrowIfNull(host);

var jobManager = host.Services.GetRequiredService<StartupJobManager>();
var stopToken = host.Services.GetRequiredService<IHostApplicationLifetime>().ApplicationStopping;
await jobManager.ProcessStartupJobs(stopToken);

return host;
}
}
5 changes: 0 additions & 5 deletions src/NCronJob/Scheduler/QueueWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ internal sealed partial class QueueWorker : BackgroundService
private readonly JobQueueManager jobQueueManager;
private readonly JobWorker jobWorker;
private readonly JobRegistry jobRegistry;
private readonly StartupJobManager startupJobManager;
private readonly ILogger<QueueWorker> logger;
private CancellationTokenSource? shutdown;
private readonly ConcurrentDictionary<string, Task?> workerTasks = new();
Expand All @@ -21,14 +20,12 @@ public QueueWorker(
JobQueueManager jobQueueManager,
JobWorker jobWorker,
JobRegistry jobRegistry,
StartupJobManager startupJobManager,
ILogger<QueueWorker> logger,
IHostApplicationLifetime lifetime)
{
this.jobQueueManager = jobQueueManager;
this.jobWorker = jobWorker;
this.jobRegistry = jobRegistry;
this.startupJobManager = startupJobManager;
this.logger = logger;

lifetime.ApplicationStopping.Register(() => shutdown?.Cancel());
Expand Down Expand Up @@ -99,9 +96,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

try
{
await startupJobManager.ProcessStartupJobs(stopToken).ConfigureAwait(false);
ScheduleInitialJobs();
await startupJobManager.WaitForStartupJobsCompletion().ConfigureAwait(false);

CreateWorkerQueues(stopToken);
jobQueueManager.QueueAdded += OnQueueAdded; // this needs to come after we create the initial Worker Queues
Expand Down
10 changes: 3 additions & 7 deletions tests/NCronJob.Tests/NCronJob.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@
</PackageReference>
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net9.0'">
<PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.0-*" />
</ItemGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>

<ItemGroup>
<Using Include="Xunit" />
Expand Down
141 changes: 141 additions & 0 deletions tests/NCronJob.Tests/RunAtStartupJobTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;

namespace NCronJob.Tests;

public class RunAtStartupJobTests : JobIntegrationBase
{
[Fact]
public async Task UseNCronJobShouldTriggerStartupJobs()
{
var builder = WebApplication.CreateSlimBuilder();
var storage = new Storage();
builder.Services.AddNCronJob(s => s.AddJob<SimpleJob>().RunAtStartup());
builder.Services.AddSingleton(_ => storage);
await using var app = builder.Build();

await app.UseNCronJobAsync();

storage.Content.Count.ShouldBe(1);
storage.Content[0].ShouldBe("SimpleJob");
}

[Fact]
public async Task ShouldStartStartupJobsBeforeApplicationIsSpunUp()
{
var builder = WebApplication.CreateSlimBuilder();
var storage = new Storage();
builder.Services.AddNCronJob(s => s.AddJob<SimpleJob>().RunAtStartup());
builder.Services.AddSingleton(_ => storage);
builder.Services.AddHostedService<StartingService>();
await using var app = builder.Build();

await app.UseNCronJobAsync();
await RunApp(app);

storage.Content.Count.ShouldBe(2);
storage.Content[0].ShouldBe("SimpleJob");
storage.Content[1].ShouldBe("StartingService");
}

[Fact]
public async Task StartupJobThatThrowsShouldNotPreventHostFromStarting()
{
var builder = WebApplication.CreateSlimBuilder();
var storage = new Storage();
builder.Services.AddNCronJob(s =>
{
s.AddJob<FailingJob>().RunAtStartup();
s.AddExceptionHandler<ExceptionHandler>();
});
builder.Services.AddSingleton(_ => storage);
await using var app = builder.Build();

await app.UseNCronJobAsync();
await RunApp(app);

storage.Content.Count.ShouldBe(1);
storage.Content[0].ShouldBe("ExceptionHandler");
}

[SuppressMessage("Major Code Smell", "S108:Nested blocks of code should not be left empty", Justification = "On purpose")]
private static async Task RunApp(IHost app, TimeSpan? runtime = null)
{
using var cts = new CancellationTokenSource(runtime ?? TimeSpan.FromSeconds(1));
try
{
await app.RunAsync(cts.Token);
}
catch (OperationCanceledException)
{
}
}

private sealed class Storage
{
#if NET8_0
private readonly object locker = new();
#else
private readonly Lock locker = new();
#endif
public List<string> Content { get; private set; } = [];

public void Add(string content)
{
lock (locker)
{
Content.Add(content);
}
}
}

private sealed class StartingService : IHostedService
{
private readonly Storage storage;

public StartingService(Storage storage) => this.storage = storage;

public Task StartAsync(CancellationToken cancellationToken)
{
storage.Add("StartingService");
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

private sealed class SimpleJob: IJob
{
private readonly Storage storage;

public SimpleJob(Storage storage) => this.storage = storage;

public Task RunAsync(IJobExecutionContext context, CancellationToken token)
{
storage.Add("SimpleJob");
return Task.CompletedTask;
}
}

private sealed class FailingJob: IJob
{
public Task RunAsync(IJobExecutionContext context, CancellationToken token) => throw new InvalidOperationException("Failed");
}

private sealed class ExceptionHandler : IExceptionHandler
{
private readonly Storage storage;

public ExceptionHandler(Storage storage) => this.storage = storage;


public Task<bool> TryHandleAsync(IJobExecutionContext jobExecutionContext, Exception exception, CancellationToken cancellationToken)
{
storage.Add("ExceptionHandler");
return Task.FromResult(true);
}
}
}

0 comments on commit c6066d9

Please sign in to comment.