Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make IInstantJobRegistry members return the job correlation id #153

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ All notable changes to **NCronJob** will be documented in this file. The project
- Expose typed version of `DisableJob()`. Added in [#151](https://github.com/NCronJob-Dev/NCronJob/issues/151), by [@nulltoken](https://github.com/nulltoken).
- Expose typed version of `EnableJob()`. Added in [#151](https://github.com/NCronJob-Dev/NCronJob/issues/151), by [@nulltoken](https://github.com/nulltoken).

### Changed

- Teach `IInstantJobRegistry` members to return the job correlation id. Changed in [#153](https://github.com/NCronJob-Dev/NCronJob/issues/153), by [@nulltoken](https://github.com/nulltoken).

### Fixed

- Make `RemoveJob<TJob>()` and `RemoveJob(Type)` remove all jobs of the given type. Fixed in [#151](https://github.com/NCronJob-Dev/NCronJob/issues/151), by [@nulltoken](https://github.com/nulltoken).
Expand Down
12 changes: 12 additions & 0 deletions docs/features/instant-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,15 @@ app.MapPost("/send-email", (RequestDto dto, IInstantJobRegistry jobRegistry) =>
return TypedResults.Ok();
});
```

## Instrumentation

All members of the `IInstantJobRegistry` interface return the correlation id of the triggered job (See [*"Tracing requests of dependencies via `CorrelationId`"*](./model-dependencies.md#tracing-requests-of-dependencies-via-correlationid).).

```csharp
Guid oneCorrelationId = jobRegistry.RunInstantJob<MyJob>();

Guid anotherCorrelationId = jobRegistry.RunScheduledJob<MyJob>(TimeSpan.FromMinutes(5));

[...]
```
76 changes: 44 additions & 32 deletions src/NCronJob/Registry/IInstantJobRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
/// <param name="parameter">An optional parameter that is passed down as the <see cref="JobExecutionContext"/> to the job.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// </summary>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// This is a fire-and-forget process, the Job will be queued with high priority and run in the background. The contents of <paramref name="parameter" />
/// are not serialized and deserialized. It is the reference to the <paramref name="parameter"/>-object that gets passed in.
Expand All @@ -24,7 +25,7 @@
/// instantJobRegistry.RunInstantJob&lt;MyJob&gt;(new MyParameterObject { Foo = "Bar" });
/// </code>
/// </example>
void RunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
Guid RunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
where TJob : IJob;

/// <summary>
Expand All @@ -36,15 +37,17 @@
/// </remarks>
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="token">An optional token to cancel the job.</param>
void RunInstantJob(Delegate jobDelegate, CancellationToken token = default);
/// <returns>The job correlation id.</returns>
Guid RunInstantJob(Delegate jobDelegate, CancellationToken token = default);

/// <summary>
/// Runs a job that will be executed after the given <paramref name="delay"/>.
/// </summary>
/// <param name="delay">The delay until the job will be executed.</param>
/// <param name="parameter">An optional parameter that is passed down as the <see cref="JobExecutionContext"/> to the job.</param>
/// <param name="token">An optional token to cancel the job.</param>
void RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
/// <returns>The job correlation id.</returns>
Guid RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
where TJob : IJob;

/// <summary>
Expand All @@ -53,7 +56,8 @@
/// <param name="startDate">The starting point when the job will be executed.</param>
/// <param name="parameter">An optional parameter that is passed down as the <see cref="JobExecutionContext"/> to the job.</param>
/// <param name="token">An optional token to cancel the job.</param>
void RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter = null, CancellationToken token = default)
/// <returns>The job correlation id.</returns>
Guid RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter = null, CancellationToken token = default)
where TJob : IJob;

/// <summary>
Expand All @@ -62,47 +66,51 @@
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="delay">The delay until the job will be executed.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// The <paramref name="jobDelegate"/> delegate supports, like <see cref="NCronJobExtensions.AddNCronJob(IServiceCollection, Delegate, string, TimeZoneInfo)"/>, that services can be retrieved dynamically.
/// Also, the <see cref="CancellationToken"/> can be retrieved in this way.
/// </remarks>
void RunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default);
Guid RunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default);

/// <summary>
/// Runs a job that will be executed at the given <paramref name="startDate"/>.
/// </summary>
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="startDate">The starting point when the job will be executed.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// The <paramref name="jobDelegate"/> delegate supports, like <see cref="NCronJobExtensions.AddNCronJob(IServiceCollection, Delegate, string, TimeZoneInfo)"/>, that services can be retrieved dynamically.
/// Also, the <see cref="CancellationToken"/> can be retrieved in this way.
/// </remarks>
void RunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default);
Guid RunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default);

/// <summary>
/// Runs a job that will be executed after the given <paramref name="delay"/>. The job will not be queued into the JobQueue, but executed directly.
/// </summary>
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="delay">The delay until the job will be executed.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// The <paramref name="jobDelegate"/> delegate supports, like <see cref="NCronJobExtensions.AddNCronJob(IServiceCollection, Delegate, string, TimeZoneInfo)"/>, that services can be retrieved dynamically.
/// Also, the <see cref="CancellationToken"/> can be retrieved in this way.
/// </remarks>
void ForceRunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default);
Guid ForceRunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default);

/// <summary>
/// Runs a job that will be executed at the given <paramref name="startDate"/>. The job will not be queued into the JobQueue, but executed directly.
/// </summary>
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="startDate">The starting point when the job will be executed.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// The <paramref name="jobDelegate"/> delegate supports, like <see cref="NCronJobExtensions.AddNCronJob(IServiceCollection, Delegate, string, TimeZoneInfo)"/>, that services can be retrieved dynamically.
/// Also, the <see cref="CancellationToken"/> can be retrieved in this way.
/// </remarks>
void ForceRunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default);
Guid ForceRunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default);

/// <summary>
/// Runs a job that will be executed after the given <paramref name="delay"/>. The job will not be queued into the JobQueue, but executed directly.
Expand All @@ -111,14 +119,16 @@
/// <param name="delay">The delay until the job will be executed.</param>
/// <param name="parameter">An optional parameter that is passed down as the <see cref="JobExecutionContext"/> to the job.</param>
/// <param name="token">An optional token to cancel the job.</param>
void ForceRunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
/// <returns>The job correlation id.</returns>
Guid ForceRunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
where TJob : IJob;

/// <summary>
/// Runs an instant job to the registry, which will be executed even if the job is not registered and the concurrency is exceeded.
/// <param name="parameter">An optional parameter that is passed down as the <see cref="JobExecutionContext"/> to the job.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// </summary>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// This is a fire-and-forget process, the Job will be run immediately in the background. The contents of <paramref name="parameter" />
/// are not serialized and deserialized. It is the reference to the <paramref name="parameter"/>-object that gets passed in.
Expand All @@ -129,7 +139,7 @@
/// instantJobRegistry.RunInstantJob&lt;MyJob&gt;(new MyParameterObject { Foo = "Bar" });
/// </code>
/// </example>
void ForceRunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
Guid ForceRunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
where TJob : IJob;

/// <summary>
Expand All @@ -141,8 +151,8 @@
/// </remarks>
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="token">An optional token to cancel the job.</param>
void ForceRunInstantJob(Delegate jobDelegate, CancellationToken token = default);

/// <returns>The job correlation id.</returns>
Guid ForceRunInstantJob(Delegate jobDelegate, CancellationToken token = default);
}

internal sealed partial class InstantJobRegistry : IInstantJobRegistry
Expand All @@ -168,71 +178,71 @@
}

/// <inheritdoc />
public void RunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
public Guid RunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
where TJob : IJob => RunScheduledJob<TJob>(TimeSpan.Zero, parameter, token);

/// <inheritdoc />
public void RunInstantJob(Delegate jobDelegate, CancellationToken token = default) => RunScheduledJob(jobDelegate, TimeSpan.Zero, token);
public Guid RunInstantJob(Delegate jobDelegate, CancellationToken token = default) => RunScheduledJob(jobDelegate, TimeSpan.Zero, token);

/// <inheritdoc />
public void RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
public Guid RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
where TJob : IJob
{
var utcNow = timeProvider.GetUtcNow();
RunJob<TJob>(utcNow + delay, parameter, false, token);
return RunJob<TJob>(utcNow + delay, parameter, false, token);
}

/// <inheritdoc />
public void RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter = null, CancellationToken token = default)
public Guid RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter = null, CancellationToken token = default)
where TJob : IJob =>
RunJob<TJob>(startDate, parameter, false, token);

/// <inheritdoc />
public void RunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default)
public Guid RunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default)
{
var utcNow = timeProvider.GetUtcNow();
RunScheduledJob(jobDelegate, utcNow + delay, token);
return RunScheduledJob(jobDelegate, utcNow + delay, token);
}

/// <inheritdoc />
public void RunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default) =>
public Guid RunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default) =>
RunDelegateJob(jobDelegate, startDate, false, token);

/// <inheritdoc />
public void ForceRunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
public Guid ForceRunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
where TJob : IJob
{
var utcNow = timeProvider.GetUtcNow();
RunJob<TJob>(utcNow + delay, parameter, true, token);
return RunJob<TJob>(utcNow + delay, parameter, true, token);
}

/// <inheritdoc />
public void ForceRunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default)
public Guid ForceRunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default)
{
var utcNow = timeProvider.GetUtcNow();
ForceRunScheduledJob(jobDelegate, utcNow + delay, token);
return ForceRunScheduledJob(jobDelegate, utcNow + delay, token);

Check warning on line 223 in src/NCronJob/Registry/IInstantJobRegistry.cs

View check run for this annotation

Codecov / codecov/patch

src/NCronJob/Registry/IInstantJobRegistry.cs#L223

Added line #L223 was not covered by tests
}

/// <inheritdoc />
public void ForceRunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default) =>
public Guid ForceRunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default) =>
RunDelegateJob(jobDelegate, startDate, true, token);

/// <inheritdoc />
public void ForceRunInstantJob(Delegate jobDelegate, CancellationToken token = default) =>
public Guid ForceRunInstantJob(Delegate jobDelegate, CancellationToken token = default) =>
ForceRunScheduledJob(jobDelegate, TimeSpan.Zero, token);

/// <inheritdoc />
public void ForceRunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
public Guid ForceRunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
where TJob : IJob => ForceRunScheduledJob<TJob>(TimeSpan.Zero, parameter, token);

private void RunDelegateJob(Delegate jobDelegate, DateTimeOffset startDate, bool forceExecution = false, CancellationToken token = default)
private Guid RunDelegateJob(Delegate jobDelegate, DateTimeOffset startDate, bool forceExecution = false, CancellationToken token = default)
{
var definition = jobRegistry.AddDynamicJob(jobDelegate);

RunInternal(definition, null, startDate, forceExecution, token);
return RunInternal(definition, null, startDate, forceExecution, token);
}

private void RunJob<TJob>(DateTimeOffset startDate, object? parameter = null, bool forceExecution = false, CancellationToken token = default)
private Guid RunJob<TJob>(DateTimeOffset startDate, object? parameter = null, bool forceExecution = false, CancellationToken token = default)
where TJob : IJob
{
using (logger.BeginScope("Triggering RunScheduledJob:"))
Expand All @@ -247,11 +257,11 @@

token.Register(() => LogCancellationRequested(parameter));

RunInternal(jobDefinition, parameter, startDate, forceExecution, token);
return RunInternal(jobDefinition, parameter, startDate, forceExecution, token);
}
}

private void RunInternal(
private Guid RunInternal(
JobDefinition jobDefinition,
object? parameter,
DateTimeOffset startDate,
Expand All @@ -273,6 +283,8 @@
jobQueue.EnqueueForDirectExecution(run, startDate);
jobQueueManager.SignalJobQueue(run.JobDefinition.JobFullName);
}

return run.CorrelationId;
}

[LoggerMessage(LogLevel.Warning, "Job {JobName} cancelled by request.")]
Expand Down
3 changes: 2 additions & 1 deletion tests/NCronJob.Tests/RunDependentJobTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ public async Task CorrelationIdIsSharedByJobsAndTheirDependencies()
var provider = CreateServiceProvider();
await provider.GetRequiredService<IHostedService>().StartAsync(CancellationToken);

provider.GetRequiredService<IInstantJobRegistry>().ForceRunInstantJob<PrincipalCorrelationIdJob>(token: CancellationToken);
var correlationId = provider.GetRequiredService<IInstantJobRegistry>().ForceRunInstantJob<PrincipalCorrelationIdJob>(token: CancellationToken);

await CommunicationChannel.Reader.ReadAsync(CancellationToken);
var storage = provider.GetRequiredService<Storage>();
storage.Guids.Count.ShouldBe(2);
storage.Guids.Distinct().Count().ShouldBe(1);
storage.Guids.First().ShouldBe(correlationId);
}

[Fact]
Expand Down
Loading