Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Extend Hangfire Plugin with JobManager, Queue Support, and EF Health Check Integration #62

Merged
merged 5 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace PiBox.Hosting.Abstractions.Extensions
{
public static class EnumerableExtensions
{
public static void ForEach<T>(this IEnumerable<T> source, Action<T> action)
{
foreach (var item in source)
action(item);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace PiBox.Hosting.Abstractions.Plugins
{
public interface IPluginConfigurator : IPluginActivateable;
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System.Collections;
using System.Reflection;
using Microsoft.Extensions.Configuration;
using PiBox.Hosting.Abstractions.Attributes;
using PiBox.Hosting.Abstractions.Extensions;
using PiBox.Hosting.Abstractions.Plugins;
using PiBox.Hosting.Abstractions.Services;

namespace PiBox.Hosting.WebHost.Services
Expand Down Expand Up @@ -36,6 +38,34 @@ private object GetArgument(Type instanceType, Type type)
if (type.HasAttribute<ConfigurationAttribute>())
return GetConfiguration(type, type.GetAttribute<ConfigurationAttribute>()!.Section);

var isList = type.GetInterfaces().Any(i => i.IsAssignableTo(typeof(IEnumerable)));
var innerType = !isList ? type : type.GetElementType() ?? type.GenericTypeArguments[0]!;

if (innerType.IsInterface && innerType.IsAssignableTo(typeof(IPluginConfigurator)))
{
if (!isList)
throw new NotSupportedException(
$"For plugin configurators you must implement to take in an enumerable of the configurators. Configurator: {innerType.Name}");
var args = _resolvedTypes.Select(x => x.Assembly).Distinct()
.SelectMany(x => x.GetTypes())
.Where(x => x.IsClass && !x.IsAbstract && x.IsAssignableTo(innerType))
.Select(ResolveInstance)
.ToList();

if (type.IsArray)
{
var array = Array.CreateInstance(innerType, args.Count);
args.ToArray().CopyTo(array, 0);
return array;
}

var listType = typeof(List<>).MakeGenericType(innerType);
var list = (IList)Activator.CreateInstance(listType)!;
foreach (var item in args)
list.Add(item);
return list;
}

if (!(type.IsGenericType && _defaultArguments.ContainsKey(type.GetGenericTypeDefinition())))
{
return null;
Expand Down Expand Up @@ -69,9 +99,9 @@ public object ResolveInstance(Type type)
if (type.HasAttribute<ConfigurationAttribute>())
return GetConfiguration(type, type.GetAttribute<ConfigurationAttribute>()!.Section);
var constructor = type.GetConstructors().FirstOrDefault();
var parameters = constructor?.GetParameters() ?? Array.Empty<ParameterInfo>();
var parameters = constructor?.GetParameters() ?? [];
if (constructor is null || parameters.Length == 0)
return TrackInstance(Activator.CreateInstance(type, Array.Empty<object>()));
return TrackInstance(Activator.CreateInstance(type, []));
var arguments = parameters.Select(parameter => GetArgument(type, parameter.ParameterType)).ToArray();
return TrackInstance(constructor.Invoke(arguments));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using NUnit.Framework;
using PiBox.Hosting.Abstractions.Attributes;
using PiBox.Hosting.Abstractions.Extensions;
using PiBox.Hosting.Abstractions.Plugins;
using PiBox.Hosting.Abstractions.Services;
using PiBox.Hosting.WebHost.Services;
using PiBox.Testing;
Expand All @@ -12,7 +13,7 @@ namespace PiBox.Hosting.WebHost.Tests
{
public class TypeImplementationResolverTests
{
private readonly Type[] _resolvedTypes = new Type[] { typeof(SampleType), typeof(WithoutCtor), typeof(UnitTestPluginConfig) };
private readonly Type[] _resolvedTypes = [typeof(SampleType), typeof(WithoutCtor), typeof(UnitTestPluginConfig)];
private readonly IConfiguration _configuration = Substitute.For<IConfiguration>();

[Test]
Expand Down Expand Up @@ -83,17 +84,46 @@ public void CanResolvePluginConfigurations()
pluginConfig!.Name.Should().Be(configName);
}

[Test]
public void CanResolveConfigurators()
{
var resolver = new TypeImplementationResolver(_configuration, _resolvedTypes, new Dictionary<Type, object>());
var instance = resolver.ResolveInstance(typeof(Configurator)) as Configurator;
instance.Should().NotBeNull();
instance!.GetMessage().Should().Be("Hello World!");

var plugin = resolver.ResolveInstance(typeof(ConfiguratorPlugin)) as ConfiguratorPlugin;
plugin.Should().NotBeNull();
plugin!.Message.Should().Be("Hello World!");
plugin!.Message2.Should().Be("Hello World!");
}

private class Configurator : IConfiguratorPluginConfigurator
{
public string GetMessage() => "Hello World!";
}

private interface IConfiguratorPluginConfigurator : IPluginConfigurator
{
string GetMessage();
}

private class ConfiguratorPlugin(IConfiguratorPluginConfigurator[] configurators, IList<IConfiguratorPluginConfigurator> configurators2) : IPluginActivateable
{
public string Message = string.Join(" ", configurators.Select(c => c.GetMessage()));
public string Message2 = string.Join(" ", configurators2.Select(c => c.GetMessage()));
}

[Configuration("sampleConfig")]
internal class UnitTestPluginConfig
{
public string Name { get; set; } = null!;
}

internal abstract class BaseClass
{
}
private abstract class BaseClass;

#pragma warning disable S3881
internal class SampleType : BaseClass, IDisposable
private class SampleType : BaseClass, IDisposable
{
public static int CreationCount;
public static int DisposeCount;
Expand Down Expand Up @@ -121,7 +151,7 @@ protected virtual void Dispose(bool disposing)
}
#pragma warning restore S3881

internal class WithoutCtor
private class WithoutCtor
{
private readonly string Test = "TEST";
public string GetTest() => Test;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace PiBox.Plugins.Jobs.Hangfire.Attributes
{
[AttributeUsage(AttributeTargets.Class)]
public class JobTimeZoneAttribute : Attribute
{
public JobTimeZoneAttribute(TimeZoneInfo timeZoneInfo)
{
TimeZoneInfo = timeZoneInfo;
}

public TimeZoneInfo TimeZoneInfo { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace PiBox.Plugins.Jobs.Hangfire.Attributes
{
[AttributeUsage(AttributeTargets.Class)]
public class JobTimeoutAttribute : Attribute
{
public JobTimeoutAttribute(int timeout, TimeUnit unit)
{
switch (unit)
{
case TimeUnit.Milliseconds:
Timeout = TimeSpan.FromMilliseconds(timeout);
break;
case TimeUnit.Seconds:
Timeout = TimeSpan.FromSeconds(timeout);
break;
case TimeUnit.Minutes:
Timeout = TimeSpan.FromMinutes(timeout);
break;
case TimeUnit.Hours:
Timeout = TimeSpan.FromHours(timeout);
break;
case TimeUnit.Days:
Timeout = TimeSpan.FromDays(timeout);
break;
default:
throw new ArgumentOutOfRangeException(nameof(unit), unit, null);
}
}

public TimeSpan Timeout { get; }
}

public enum TimeUnit
{
Milliseconds,
Seconds,
Minutes,
Hours,
Days
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
using Hangfire.States;

namespace PiBox.Plugins.Jobs.Hangfire.Attributes
{
[AttributeUsage(AttributeTargets.Class)]
public class RecurringJobAttribute : Attribute
{
public RecurringJobAttribute(string cronPattern)
public RecurringJobAttribute(string cronPattern, string queue = EnqueuedState.DefaultQueue)
{
CronPattern = cronPattern;
Queue = queue;
}

public string CronPattern { get; }
public string Queue { get; }
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
using System.Text.Json;
using Hangfire;
using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;
using Hangfire.Storage.Monitoring;
using PiBox.Plugins.Jobs.Hangfire.Extensions;

namespace PiBox.Plugins.Jobs.Hangfire.Attributes
{
public class UniquePerQueueAttribute : JobFilterAttribute, IElectStateFilter
{
private static readonly JsonSerializerOptions _jsonSerializerOptions = new() { IncludeFields = false };
public string Queue { get; set; }

public bool CheckScheduledJobs { get; set; }
Expand All @@ -23,66 +22,42 @@ public UniquePerQueueAttribute(string queue)

private IEnumerable<JobEntity> GetJobs(ElectStateContext context)
{
IMonitoringApi monitoringApi = context.Storage.GetMonitoringApi();
List<JobEntity> jobs =
new List<JobEntity>();
foreach ((string key, EnqueuedJobDto enqueuedJobDto1) in monitoringApi.EnqueuedJobs(Queue, 0, 500))
{
string id = key;
EnqueuedJobDto enqueuedJobDto2 = enqueuedJobDto1;
jobs.Add(JobEntity.Parse(id, enqueuedJobDto2.Job));
}
var monitoringApi = context.Storage.GetMonitoringApi();
var jobs = new List<JobEntity>();
foreach (var (key, enqueuedJobDto1) in monitoringApi.GetCompleteList((api, page) => api.EnqueuedJobs(Queue, page.Offset, page.PageSize)))
jobs.Add(JobEntity.Parse(key, enqueuedJobDto1.Job));

if (CheckScheduledJobs)
{
foreach (KeyValuePair<string, ScheduledJobDto> pair in monitoringApi.ScheduledJobs(0, 500))
{
string id = pair.Key;
ScheduledJobDto scheduledJobDto3 = pair.Value;
foreach (var (id, scheduledJobDto3) in monitoringApi.GetCompleteList((api, page) => api.ScheduledJobs(page.Offset, page.PageSize)))
jobs.Add(JobEntity.Parse(id, scheduledJobDto3.Job));
}
}

if (!CheckRunningJobs)
{
return jobs;
}

foreach (KeyValuePair<string, ProcessingJobDto> pair in
monitoringApi.ProcessingJobs(0, 500))
{
string id = pair.Key;
ProcessingJobDto processingJobDto3 = pair.Value;
foreach (var (id, processingJobDto3) in monitoringApi.GetCompleteList((api, page) => api.ProcessingJobs(page.Offset, page.PageSize)))
jobs.Add(JobEntity.Parse(id, processingJobDto3.Job));
}

return jobs;
}

public void OnStateElection(ElectStateContext context)
{
if (!(context.CandidateState is EnqueuedState candidateState))
{
if (context.CandidateState is not EnqueuedState candidateState)
return;
}

candidateState.Queue = Queue;
BackgroundJob job = context.BackgroundJob;
var job = context.BackgroundJob;
var filteredArguments = job.Job.Args.Where(x => x.GetType() != typeof(CancellationToken)).ToList();
var jobArgs = JsonSerializer.Serialize(filteredArguments,
new JsonSerializerOptions() { IncludeFields = false });
var jobArgs = JsonSerializer.Serialize(filteredArguments, _jsonSerializerOptions);
var jobs = GetJobs(context);
var jobsWithArgs = jobs
.Select(x => new { JobEntity = x, ArgAsString = jobArgs }).ToList();
var alreadyExists = jobsWithArgs.Exists(x =>
x.JobEntity.Value.Method == job.Job.Method && x.ArgAsString == jobArgs && x.JobEntity.Id != job.Id);
if (!alreadyExists)
{
return;
}

context.CandidateState =
new DeletedState() { Reason = "Instance of the same job is already queued." };
new DeletedState { Reason = "Instance of the same job is already queued." };
}

private sealed record JobEntity(string Id, global::Hangfire.Common.Job Value)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Hangfire.Storage;

namespace PiBox.Plugins.Jobs.Hangfire.Extensions
{
public static class MonitoringApiExtensions
{
private const int PageSize = 500;
public static IList<T> GetCompleteList<T>(this IMonitoringApi api, Func<IMonitoringApi, HangfirePageOptions, IEnumerable<T>> action)
{
var pageOpts = new HangfirePageOptions(0, PageSize);
var result = new List<T>();
while (true)
{
var partialResult = action(api, pageOpts).ToList();
result.AddRange(partialResult);
if (partialResult.Count < pageOpts.PageSize)
break;
pageOpts = pageOpts.Next();
}
return result;
}
}

public record HangfirePageOptions(int Offset, int PageSize)
{
public HangfirePageOptions Next() => this with { Offset = Offset + PageSize };
}
}
Loading
Loading