Skip to content

Commit

Permalink
Prefer JobDefinition to link dependent jobs over Type
Browse files Browse the repository at this point in the history
  • Loading branch information
nulltoken committed Nov 2, 2024
1 parent 252336c commit 0ca85a0
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ namespace NCronJob;

internal sealed class DependentJobRegistryEntry
{
public required Type PrincipalType { get; init; }
public List<JobDefinition> RunWhenSuccess { get; init; } = [];
public List<JobDefinition> RunWhenFaulted { get; init; } = [];
}
39 changes: 25 additions & 14 deletions src/NCronJob/Configuration/Builder/NCronJobOptionBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Cronos;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System.Diagnostics;
using System.Reflection;

namespace NCronJob;
Expand Down Expand Up @@ -40,8 +41,8 @@ internal NCronJobOptionBuilder(
public IStartupStage<T> AddJob<T>(Action<JobOptionBuilder>? options = null)
where T : class, IJob
{
var builder = AddJobInternal(typeof(T), options);
return new StartupStage<T>(Services, Settings, jobRegistry, builder);
var (builder, jobDefinitions) = AddJobInternal(typeof(T), options);
return new StartupStage<T>(Services, jobDefinitions, Settings, jobRegistry, builder);
}

/// <summary>
Expand All @@ -59,8 +60,8 @@ public IStartupStage<T> AddJob<T>(Action<JobOptionBuilder>? options = null)
/// </example>
public IStartupStage<IJob> AddJob(Type jobType, Action<JobOptionBuilder>? options = null)
{
var builder = AddJobInternal(jobType, options);
return new StartupStage<IJob>(Services, Settings, jobRegistry, builder);
var (builder, jobDefinitions) = AddJobInternal(jobType, options);
return new StartupStage<IJob>(Services, jobDefinitions, Settings, jobRegistry, builder);
}

/// <summary>
Expand Down Expand Up @@ -136,10 +137,14 @@ internal static CronExpression GetCronExpression(string expression)
: throw new InvalidOperationException("Invalid cron expression");
}

private JobOptionBuilder AddJobInternal(Type jobType, Action<JobOptionBuilder>? options)
private (JobOptionBuilder, ICollection<JobDefinition>) AddJobInternal(
Type jobType,
Action<JobOptionBuilder>? options)
{
ValidateConcurrencySetting(jobType);

var jobDefinitions = new List<JobDefinition>();

var builder = new JobOptionBuilder();
options?.Invoke(builder);

Expand All @@ -159,9 +164,10 @@ private JobOptionBuilder AddJobInternal(Type jobType, Action<JobOptionBuilder>?
UserDefinedCronExpression = option.CronExpression
};
jobRegistry.Add(entry);
jobDefinitions.Add(entry);
}

return builder;
return (builder, jobDefinitions);
}

private static bool DetermineAndValidatePrecision(string cronExpression)
Expand Down Expand Up @@ -190,13 +196,16 @@ internal class StartupStage<TJob> : IStartupStage<TJob> where TJob : class, IJob
private readonly ConcurrencySettings settings;
private readonly JobRegistry jobRegistry;
private readonly JobOptionBuilder jobOptionBuilder;
private readonly ICollection<JobDefinition> jobDefinitions;

internal StartupStage(
IServiceCollection services,
ICollection<JobDefinition> jobDefinitions,
ConcurrencySettings settings,
JobRegistry jobRegistry,
JobOptionBuilder jobOptionBuilder)
{
this.jobDefinitions = jobDefinitions;
this.services = services;
this.settings = settings;
this.jobRegistry = jobRegistry;
Expand All @@ -210,20 +219,20 @@ public INotificationStage<TJob> RunAtStartup()

jobRegistry.UpdateJobDefinitionsToRunAtStartup<TJob>();

return new NotificationStage<TJob>(services, settings, jobRegistry);
return new NotificationStage<TJob>(services, jobDefinitions, settings, jobRegistry);
}

/// <inheritdoc />
public INotificationStage<TJob> AddNotificationHandler<TJobNotificationHandler>() where TJobNotificationHandler : class, IJobNotificationHandler<TJob>
{
services.TryAddScoped<IJobNotificationHandler<TJob>, TJobNotificationHandler>();
return new NotificationStage<TJob>(services, settings, jobRegistry);
return new NotificationStage<TJob>(services, jobDefinitions, settings, jobRegistry);
}

/// <inheritdoc />
public INotificationStage<TJob> ExecuteWhen(Action<DependencyBuilder<TJob>>? success = null, Action<DependencyBuilder<TJob>>? faulted = null)
{
ExecuteWhenHelper.AddRegistration(jobRegistry, success, faulted);
ExecuteWhenHelper.AddRegistration(jobRegistry, jobDefinitions, success, faulted);

return this;
}
Expand All @@ -246,15 +255,18 @@ internal class NotificationStage<TJob> : INotificationStage<TJob> where TJob : c
private readonly IServiceCollection services;
private readonly ConcurrencySettings settings;
private readonly JobRegistry jobRegistry;
private readonly ICollection<JobDefinition> jobDefinitions;

internal NotificationStage(
IServiceCollection services,
ICollection<JobDefinition> jobDefinitions,
ConcurrencySettings settings,
JobRegistry jobRegistry)
{
this.services = services;
this.settings = settings;
this.jobRegistry = jobRegistry;
this.jobDefinitions = jobDefinitions;
}

/// <inheritdoc />
Expand All @@ -269,7 +281,7 @@ public INotificationStage<TJob> AddNotificationHandler<TJobNotificationHandler>(
public INotificationStage<TJob> ExecuteWhen(Action<DependencyBuilder<TJob>>? success = null,
Action<DependencyBuilder<TJob>>? faulted = null)
{
ExecuteWhenHelper.AddRegistration(jobRegistry, success, faulted);
ExecuteWhenHelper.AddRegistration(jobRegistry, jobDefinitions, success, faulted);

return this;
}
Expand Down Expand Up @@ -358,6 +370,7 @@ internal static class ExecuteWhenHelper
{
public static void AddRegistration<TJob>(
JobRegistry jobRegistry,
ICollection<JobDefinition> parentJobDefinitions,
Action<DependencyBuilder<TJob>>? success,
Action<DependencyBuilder<TJob>>? faulted)
where TJob : IJob
Expand All @@ -369,11 +382,10 @@ public static void AddRegistration<TJob>(
var runWhenSuccess = dependencyBuilder.GetDependentJobOption();
var entry = new DependentJobRegistryEntry
{
PrincipalType = typeof(TJob),
RunWhenSuccess = runWhenSuccess,

};
jobRegistry.RegisterJobDependency(entry);
jobRegistry.RegisterJobDependency(parentJobDefinitions, entry);
}

if (faulted is not null)
Expand All @@ -383,11 +395,10 @@ public static void AddRegistration<TJob>(
var runWhenFaulted = dependencyBuilder.GetDependentJobOption();
var entry = new DependentJobRegistryEntry
{
PrincipalType = typeof(TJob),
RunWhenFaulted = runWhenFaulted,

};
jobRegistry.RegisterJobDependency(entry);
jobRegistry.RegisterJobDependency(parentJobDefinitions, entry);
}
}
}
4 changes: 2 additions & 2 deletions src/NCronJob/Execution/JobExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ public void InformDependentJobs(JobExecutionContext context, bool success)

var jobRun = context.JobRun;
var dependencies = success
? jobRegistry.GetDependentSuccessJobTypes(jobRun.JobDefinition.Type)
: jobRegistry.GetDependentFaultedJobTypes(jobRun.JobDefinition.Type);
? jobRegistry.GetDependentSuccessJobTypes(jobRun.JobDefinition)
: jobRegistry.GetDependentFaultedJobTypes(jobRun.JobDefinition);

if (dependencies.Count > 0)
jobRun.NotifyStateChange(JobStateType.WaitingForDependency);
Expand Down
9 changes: 7 additions & 2 deletions src/NCronJob/Registry/IInstantJobRegistry.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using System.Diagnostics;

namespace NCronJob;

Expand Down Expand Up @@ -235,17 +236,21 @@ private void RunJob<TJob>(DateTimeOffset startDate, object? parameter = null, bo
{
using (logger.BeginScope("Triggering RunScheduledJob:"))
{
var newJobDefinition = new JobDefinition(typeof(TJob), parameter, null, null);

if (!jobRegistry.IsJobRegistered<TJob>())
{
LogJobNotRegistered(typeof(TJob).Name);
var newJobDefinition = new JobDefinition(typeof(TJob), parameter, null, null);
jobRegistry.Add(newJobDefinition);
}

JobDefinition? jobDefinition = jobRegistry.FindJobDefinition(typeof(TJob));

Debug.Assert(jobDefinition != null);

token.Register(() => LogCancellationRequested(parameter));

RunInternal(newJobDefinition, parameter, startDate, forceExecution, token);
RunInternal(jobDefinition, parameter, startDate, forceExecution, token);
}
}

Expand Down
40 changes: 28 additions & 12 deletions src/NCronJob/Registry/JobDefinition.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
using Cronos;
using NCronJob.Registry;

namespace NCronJob;

internal sealed record JobDefinition(
Type Type,
object? Parameter,
CronExpression? CronExpression,
TimeZoneInfo? TimeZone,
string? JobName = null,
JobExecutionAttributes? JobPolicyMetadata = null)
internal sealed class JobDefinition(
Type type,
object? parameter,
CronExpression? cronExpression,
TimeZoneInfo? timeZone,
string? jobName = null,
JobExecutionAttributes? jobPolicyMetadata = null) : IEquatable<JobDefinition>
{
public Type Type { get; } = type;


public bool IsStartupJob { get; set; }

public string JobName { get; } = JobName ?? Type.Name;
public string JobName => jobName ?? Type.Name;

public string? CustomName { get; set; }

public CronExpression? CronExpression { get; set; } = CronExpression;
public CronExpression? CronExpression { get; set; } = cronExpression;

/// <summary>
/// This is the unhandled cron expression from the user. Using <see cref="CronExpression.ToString"/> will alter the expression.
Expand All @@ -31,9 +35,9 @@ internal sealed record JobDefinition(
/// </summary>
public string? UserDefinedCronExpression { get; set; }

public object? Parameter { get; set; } = Parameter;
public object? Parameter { get; set; } = parameter;

public TimeZoneInfo? TimeZone { get; set; } = TimeZone;
public TimeZoneInfo? TimeZone { get; set; } = timeZone;

/// <summary>
/// The JobFullName is used as a unique identifier for the job type including anonymous jobs. This helps with concurrency management.
Expand All @@ -42,12 +46,24 @@ internal sealed record JobDefinition(
? Type.FullName ?? JobName
: $"{typeof(DynamicJobFactory).Namespace}.{JobName}";

private JobExecutionAttributes JobPolicyMetadata { get; } = JobPolicyMetadata ?? new JobExecutionAttributes(Type);
private JobExecutionAttributes JobPolicyMetadata => jobPolicyMetadata ?? new JobExecutionAttributes(Type);
public RetryPolicyAttribute? RetryPolicy => JobPolicyMetadata.RetryPolicy;
public SupportsConcurrencyAttribute? ConcurrencyPolicy => JobPolicyMetadata.ConcurrencyPolicy;

// Hooks for specific state changes
public Action<JobDefinition>? OnCompletion { get; set; }
public Action<JobDefinition, string?>? OnFailure { get; set; }
public Action<JobDefinition>? OnRunning { get; set; }

public bool Equals(JobDefinition? other) => JobDefinitionEqualityComparer.Instance.Equals(this, other);

public override bool Equals(object? obj)
{
return Equals(obj as JobDefinition);
}

public override int GetHashCode()
{
return JobDefinitionEqualityComparer.Instance.GetHashCode(this);
}
}
25 changes: 25 additions & 0 deletions src/NCronJob/Registry/JobDefinitionEqualityComparer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using NCronJob;

namespace NCronJob.Registry;

internal sealed class JobDefinitionEqualityComparer : IEqualityComparer<JobDefinition>
{
public static readonly JobDefinitionEqualityComparer Instance = new();

public bool Equals(JobDefinition? x, JobDefinition? y) =>
x is null && y is null || x is not null && y is not null
&& x.Type == y.Type && x.Type != typeof(DynamicJobFactory)
&& x.Parameter == y.Parameter
&& x.CronExpression == y.CronExpression
//&& x.TimeZone == y.TimeZone
&& x.CustomName == y.CustomName
&& x.IsStartupJob == y.IsStartupJob;

public int GetHashCode(JobDefinition obj) => HashCode.Combine(
obj.Type,
obj.Parameter,
obj.CronExpression,
//obj.TimeZone,
obj.CustomName,
obj.IsStartupJob);
}
Loading

0 comments on commit 0ca85a0

Please sign in to comment.