diff --git a/src/Hangfire.Mongo.Sample.ASPNetCore/Controllers/HomeController.cs b/src/Hangfire.Mongo.Sample.ASPNetCore/Controllers/HomeController.cs index 34fe772..79a9791 100755 --- a/src/Hangfire.Mongo.Sample.ASPNetCore/Controllers/HomeController.cs +++ b/src/Hangfire.Mongo.Sample.ASPNetCore/Controllers/HomeController.cs @@ -89,8 +89,8 @@ public ActionResult Delayed(int id) public ActionResult Recurring() { - RecurringJob.AddOrUpdate("recurring-job", - () => Recurring($@"Hangfire recurring task started - {Guid.NewGuid()}"), Cron.Minutely); + RecurringJob.AddOrUpdate("my-recurring-job", + j => j.Recurring($@"Hangfire recurring task started - {Guid.NewGuid()}"), Cron.Minutely); return RedirectToAction("Index"); } @@ -100,10 +100,5 @@ public static void PrintToDebug(string message) Debug.WriteLine(message); } - public static void Recurring(string message) - { - Thread.Sleep(15000); - Debug.WriteLine(message); - } } } \ No newline at end of file diff --git a/src/Hangfire.Mongo.Sample.ASPNetCore/MyRecurringjob.cs b/src/Hangfire.Mongo.Sample.ASPNetCore/MyRecurringjob.cs new file mode 100644 index 0000000..04fb391 --- /dev/null +++ b/src/Hangfire.Mongo.Sample.ASPNetCore/MyRecurringjob.cs @@ -0,0 +1,18 @@ +using System; +using System.Diagnostics; +using System.Threading; + +namespace Hangfire.Mongo.Sample.ASPNetCore; + +[Queue("not-default")] +[AutomaticRetry(Attempts = 0, LogEvents = true, OnAttemptsExceeded = AttemptsExceededAction.Delete)] +[SkipWhenPreviousJobIsRunning] +public class MyRecurringjob +{ + // [DisableConcurrentExecution("{0}", 3)] + public void Recurring(string message) + { + Thread.Sleep(TimeSpan.FromMinutes((1))); + Console.WriteLine(message); + } +} \ No newline at end of file diff --git a/src/Hangfire.Mongo.Sample.ASPNetCore/SkipWhenPreviousJobIsRunningAttribute.cs b/src/Hangfire.Mongo.Sample.ASPNetCore/SkipWhenPreviousJobIsRunningAttribute.cs new file mode 100644 index 0000000..0a83381 --- /dev/null +++ b/src/Hangfire.Mongo.Sample.ASPNetCore/SkipWhenPreviousJobIsRunningAttribute.cs @@ -0,0 +1,93 @@ +using System; +using System.Collections.Generic; +using Hangfire.Client; +using Hangfire.Common; +using Hangfire.States; +using Hangfire.Storage; + +namespace Hangfire.Mongo.Sample.ASPNetCore; + +// Copied from here: https://gist.github.com/odinserj/a6ad7ba6686076c9b9b2e03fcf6bf74e +// TODO: Add to Framework-Common's HangfireLocal NuGet package +public class SkipWhenPreviousJobIsRunningAttribute : JobFilterAttribute, IClientFilter, IApplyStateFilter +{ + private const string Running = "Running"; + private const string RecurringJobParam = "RecurringJobId"; + private const string KeyPrefix = "recurring-job:"; + private const string Yes = "yes"; + private const string No = "no"; + + public void OnCreating(CreatingContext context) + { + Console.WriteLine($"OnCreating: Queue: {context.Job.Queue}, Canceled: {context.Canceled}"); + // We can't handle old storages + if (context.Connection is not JobStorageConnection connection) + { + return; + } + + // We should run this filter only for background jobs based on recurring ones + if (!context.Parameters.ContainsKey(RecurringJobParam)) + { + return; + } + + var recurringJobId = context.Parameters[RecurringJobParam] as string; + + // RecurringJobId is malformed. This should not happen, but anyway. + if (string.IsNullOrWhiteSpace(recurringJobId)) + { + return; + } + + var running = connection.GetValueFromHash($"{KeyPrefix}{recurringJobId}", Running); + if (running?.Equals(Yes, StringComparison.OrdinalIgnoreCase) == true) + { + Console.WriteLine($"OnCreating: Setting Canceled: true"); + context.Canceled = true; + } + } + + public void OnCreated(CreatedContext filterContext) + { + } + + public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction) + { + Console.WriteLine($"OnStateApplied: NewState: {context.NewState.Name}"); + if (context.NewState is EnqueuedState) + { + var recurringJobId = SerializationHelper.Deserialize( + context.Connection.GetJobParameter(context.BackgroundJob.Id, RecurringJobParam)); + if (string.IsNullOrWhiteSpace(recurringJobId)) + { + return; + } + + Console.WriteLine($"OnStateApplied: Setting: {Running}:{Yes}"); + transaction.SetRangeInHash( + $"{KeyPrefix}{recurringJobId}", + new[] {new KeyValuePair(Running, Yes)}); + } + else if ((context.NewState.IsFinal && + !FailedState.StateName.Equals(context.OldStateName, StringComparison.OrdinalIgnoreCase)) || + (context.NewState is FailedState)) + { + var recurringJobId = + SerializationHelper.Deserialize( + context.Connection.GetJobParameter(context.BackgroundJob.Id, RecurringJobParam)); + if (string.IsNullOrWhiteSpace(recurringJobId)) + { + return; + } + Console.WriteLine($"OnStateApplied: Setting: {Running}:{No}"); + transaction.SetRangeInHash( + $"{KeyPrefix}{recurringJobId}", + new[] {new KeyValuePair(Running, No)}); + } + } + + public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction) + { + } +} \ No newline at end of file diff --git a/src/Hangfire.Mongo.Sample.ASPNetCore/Startup.cs b/src/Hangfire.Mongo.Sample.ASPNetCore/Startup.cs index d0ecfbc..47f12b0 100755 --- a/src/Hangfire.Mongo.Sample.ASPNetCore/Startup.cs +++ b/src/Hangfire.Mongo.Sample.ASPNetCore/Startup.cs @@ -1,3 +1,4 @@ +using System; using Hangfire.Mongo.Migration.Strategies; using Hangfire.Mongo.Migration.Strategies.Backup; using Microsoft.AspNetCore.Builder; @@ -49,17 +50,17 @@ public void ConfigureServices(IServiceCollection services) MigrationStrategy = new MigrateMongoMigrationStrategy(), BackupStrategy = new CollectionMongoBackupStrategy() }, - CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.Watch, + SlidingInvisibilityTimeout = TimeSpan.FromSeconds(5) }; //config.UseLogProvider(new FileLogProvider()); config.SetDataCompatibilityLevel(CompatibilityLevel.Version_180); config.UseMongoStorage(mongoClient, mongoUrlBuilder.DatabaseName, storageOptions) - .UseColouredConsoleLogProvider(LogLevel.Info); + .UseColouredConsoleLogProvider(LogLevel.Trace); }); services.AddHangfireServer(options => { - options.Queues = new[] { "default", "notDefault" }; + options.Queues = new[] { "default", "not-default" }; }); services.AddMvc(c => c.EnableEndpointRouting = false); diff --git a/src/Hangfire.Mongo.Tests/MongoFetchedJobFacts.cs b/src/Hangfire.Mongo.Tests/MongoFetchedJobFacts.cs index 235a1f7..d0950c6 100644 --- a/src/Hangfire.Mongo.Tests/MongoFetchedJobFacts.cs +++ b/src/Hangfire.Mongo.Tests/MongoFetchedJobFacts.cs @@ -4,6 +4,7 @@ using Hangfire.Mongo.Database; using Hangfire.Mongo.Dto; using Hangfire.Mongo.Tests.Utils; +using Hangfire.States; using MongoDB.Bson; using MongoDB.Driver; using Xunit; @@ -142,7 +143,7 @@ public void Heartbeat_LonRunningJob_UpdatesFetchedAt() var options = new MongoStorageOptions() {SlidingInvisibilityTimeout = TimeSpan.FromSeconds(1)}; var queue = "default"; var jobId = ObjectId.GenerateNewId(); - var id = CreateJobQueueRecord(_dbContext, jobId, queue, _fetchedAt); + var id = CreateJobQueueRecord(_dbContext, jobId, queue, _fetchedAt, ProcessingState.StateName); var initialFetchedAt = DateTime.UtcNow; // Act @@ -155,18 +156,24 @@ public void Heartbeat_LonRunningJob_UpdatesFetchedAt() Assert.True(job.FetchedAt > initialFetchedAt, "Expected job FetchedAt field to be updated"); } - private ObjectId CreateJobQueueRecord(HangfireDbContext connection, ObjectId jobId, string queue, DateTime? fetchedAt) + private ObjectId CreateJobQueueRecord( + HangfireDbContext connection, + ObjectId jobId, + string queue, + DateTime? fetchedAt, + string stateName = null) { - var jobQueue = new JobDto + var job = new JobDto { Id = jobId, Queue = queue, - FetchedAt = fetchedAt + FetchedAt = fetchedAt, + StateName = stateName }; - connection.JobGraph.InsertOne(jobQueue.Serialize()); + connection.JobGraph.InsertOne(job.Serialize()); - return jobQueue.Id; + return job.Id; } } #pragma warning restore 1591 diff --git a/src/Hangfire.Mongo/Database/HangfireDbContext.cs b/src/Hangfire.Mongo/Database/HangfireDbContext.cs index 6e9fdd1..caa2eb5 100644 --- a/src/Hangfire.Mongo/Database/HangfireDbContext.cs +++ b/src/Hangfire.Mongo/Database/HangfireDbContext.cs @@ -1,5 +1,4 @@ using System; -using Hangfire.Mongo.Dto; using MongoDB.Bson; using MongoDB.Driver; diff --git a/src/Hangfire.Mongo/MongoConnection.cs b/src/Hangfire.Mongo/MongoConnection.cs index 00d8abf..6549461 100644 --- a/src/Hangfire.Mongo/MongoConnection.cs +++ b/src/Hangfire.Mongo/MongoConnection.cs @@ -52,7 +52,8 @@ public override string CreateExpiredJob(Job job, IDictionary par TimeSpan expireIn) { string jobId; - using (var transaction = _storageOptions.Factory.CreateMongoWriteOnlyTransaction(_dbContext, _storageOptions)) + using (var transaction = + _storageOptions.Factory.CreateMongoWriteOnlyTransaction(_dbContext, _storageOptions)) { jobId = transaction.CreateExpiredJob(job, parameters, createdAt, expireIn); transaction.Commit(); @@ -73,7 +74,8 @@ public override IFetchedJob FetchNextJob(string[] queues, CancellationToken canc public override void SetJobParameter(string id, string name, string value) { - using (var transaction = _storageOptions.Factory.CreateMongoWriteOnlyTransaction(_dbContext, _storageOptions)) + using (var transaction = + _storageOptions.Factory.CreateMongoWriteOnlyTransaction(_dbContext, _storageOptions)) { transaction.SetJobParameter(id, name, value); transaction.Commit(); @@ -91,6 +93,7 @@ public override string GetJobParameter(string id, string name) { throw new ArgumentNullException(nameof(name)); } + var objectId = ObjectId.Parse(id); var job = _dbContext .JobGraph @@ -235,17 +238,16 @@ public override void Heartbeat(string serverId) throw new ArgumentNullException(nameof(serverId)); } - var now = GetUtcDateTime(); - var updateResult = _dbContext.Server.UpdateMany(new BsonDocument("_id", serverId), - new BsonDocument + var update = new BsonDocument + { + ["$currentDate"] = new BsonDocument { - ["$set"] = new BsonDocument - { - [nameof(ServerDto.LastHeartbeat)] = now - } - }); + [nameof(ServerDto.LastHeartbeat)] = true + } + }; + var updateResult = _dbContext.Server.UpdateOne(new BsonDocument("_id", serverId), update); - if (updateResult != null && updateResult.IsAcknowledged && updateResult.ModifiedCount == 0) + if (updateResult is {IsAcknowledged: true, ModifiedCount: 0}) { throw new BackgroundServerGoneException(); } @@ -258,14 +260,13 @@ public override int RemoveTimedOutServers(TimeSpan timeOut) throw new ArgumentException("The `timeOut` value must be positive.", nameof(timeOut)); } - var now = GetUtcDateTime(); - return (int)_dbContext + return (int) _dbContext .Server .DeleteMany(new BsonDocument { [nameof(ServerDto.LastHeartbeat)] = new BsonDocument { - ["$lt"] = now.Add(timeOut.Negate()) + ["$lt"] = DateTime.UtcNow.Add(timeOut.Negate()) } }) .DeletedCount; @@ -302,7 +303,8 @@ public override string GetFirstByLowestScoreFromSet(string key, double fromScore return GetFirstByLowestScoreFromSet(key, fromScore, toScore, 1).FirstOrDefault(); } - public override List GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore, int count) + public override List GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore, + int count) { if (Logger.IsTraceEnabled()) { @@ -346,7 +348,8 @@ public override void SetRangeInHash(string key, IEnumerable GetAllItemsFromList(string key) { throw new ArgumentNullException(nameof(key)); } + return _dbContext .JobGraph.Find(new BsonDocument { @@ -670,30 +677,27 @@ public override List GetAllItemsFromList(string key) .Select(b => b[nameof(ListDto.Value)].AsString).ToList(); } + private static bool _useServerStatus; + public override DateTime GetUtcDateTime() { + if (_useServerStatus) + { + return GetUtcDateUsingServerStatus(); + } DateTime now; try { - var pipeline = new[] - { - new BsonDocument("$project", new BsonDocument("date", "$$NOW")) - }; - // we should always have a schema document in the db, and this - var time = _dbContext.Schema.Aggregate(pipeline).FirstOrDefault(); - if (time is null) - { - throw new InvalidOperationException("No documents in the schema collection"); - } - now = time["date"].ToUniversalTime(); + now = GetUtcDateUsingAggregation(); } - catch (Exception e) + catch (Exception) { - Logger.WarnException("Failed to get UTC datetime from mongodb server, using local UTC", e); - now = DateTime.UtcNow; + Logger.Warn("Failed to get UTC datetime from mongodb server, using 'serverStatus' instead"); + now = GetUtcDateUsingServerStatus(); + _useServerStatus = true; } - + if (Logger.IsTraceEnabled()) { Logger.Trace($"GetUtcDateTime() => {now:O}"); @@ -702,6 +706,28 @@ public override DateTime GetUtcDateTime() return now; } + private DateTime GetUtcDateUsingAggregation() + { + var pipeline = new[] + { + new BsonDocument("$project", new BsonDocument("date", "$$NOW")) + }; + // we should always have a schema document in the db, and this + var time = _dbContext.Schema.Aggregate(pipeline).FirstOrDefault(); + if (time is null) + { + throw new InvalidOperationException("No documents in the schema collection"); + } + + return time["date"].ToUniversalTime(); + } + + private DateTime GetUtcDateUsingServerStatus() + { + var serverStatus = _dbContext.Database.RunCommand(new BsonDocument("serverStatus", 1)); + return serverStatus["localTime"].ToUniversalTime(); + } + public override bool GetSetContains([NotNull] string key, [NotNull] string value) { if (Logger.IsTraceEnabled()) diff --git a/src/Hangfire.Mongo/MongoFetchedJob.cs b/src/Hangfire.Mongo/MongoFetchedJob.cs index 133c198..9122099 100644 --- a/src/Hangfire.Mongo/MongoFetchedJob.cs +++ b/src/Hangfire.Mongo/MongoFetchedJob.cs @@ -6,6 +6,7 @@ using Hangfire.Logging; using Hangfire.Mongo.Database; using Hangfire.Mongo.Dto; +using Hangfire.States; using Hangfire.Storage; using MongoDB.Bson; using MongoDB.Driver; @@ -136,7 +137,11 @@ public virtual void Dispose() private void StartHeartbeat(TimeSpan slidingInvisibilityTimeout) { var timerInterval = TimeSpan.FromSeconds(slidingInvisibilityTimeout.TotalSeconds / 5); - var filter = new BsonDocument("_id", Id); + var filter = new BsonDocument + { + ["_id"] = _id, + [nameof(JobDto.StateName)] = ProcessingState.StateName + }; var update = new BsonDocument { ["$currentDate"] = new BsonDocument @@ -178,7 +183,11 @@ private void StartHeartbeat(TimeSpan slidingInvisibilityTimeout) try { var result = _db.JobGraph.FindOneAndUpdate(filter, update, options); - _fetchedAt = result[nameof(JobDto.FetchedAt)].ToUniversalTime(); + if (result != null && result.TryGetValue(nameof(JobDto.FetchedAt), out var updatedFetchedAt)) + { + _fetchedAt = updatedFetchedAt.ToUniversalTime(); + } + if (Logger.IsTraceEnabled() && sw != null) { var serializedModel = new Dictionary diff --git a/src/Hangfire.Mongo/MongoJobFetcher.cs b/src/Hangfire.Mongo/MongoJobFetcher.cs index 22e3b9e..98d102f 100644 --- a/src/Hangfire.Mongo/MongoJobFetcher.cs +++ b/src/Hangfire.Mongo/MongoJobFetcher.cs @@ -100,11 +100,6 @@ public virtual IFetchedJob FetchNextJob(string[] queues, CancellationToken cance /// public virtual MongoFetchedJob TryAllQueues(string[] queues, CancellationToken cancellationToken) { - if (Logger.IsTraceEnabled()) - { - Logger.Trace($"Try fetch from queues: {string.Join(",", queues)} Thread[{Thread.CurrentThread.ManagedThreadId}]"); - } - foreach (var queue in queues) { var fetchedJob = TryGetEnqueuedJob(queue, cancellationToken); @@ -137,6 +132,7 @@ public virtual MongoFetchedJob TryGetEnqueuedJob(string queue, CancellationToken new BsonDocument(nameof(JobDto.FetchedAt), new BsonDocument("$lt", date)) }); } + var filter = new BsonDocument("$and", new BsonArray { new BsonDocument(nameof(JobDto.Queue), queue), diff --git a/src/Hangfire.Mongo/MongoWriteOnlyTransaction.cs b/src/Hangfire.Mongo/MongoWriteOnlyTransaction.cs index db26432..5e9fb78 100644 --- a/src/Hangfire.Mongo/MongoWriteOnlyTransaction.cs +++ b/src/Hangfire.Mongo/MongoWriteOnlyTransaction.cs @@ -36,7 +36,7 @@ public MongoWriteOnlyTransaction(HangfireDbContext dbContext, MongoStorageOption { StorageOptions = storageOptions; DbContext = dbContext ?? throw new ArgumentNullException(nameof(dbContext)); - JobsAddedToQueue = new HashSet(); + JobsAddedToQueue = []; } private MongoJobUpdates GetOrAddJobUpdates(string jobId) @@ -48,7 +48,7 @@ private MongoJobUpdates GetOrAddJobUpdates(string jobId) return updates; } - + public override void Dispose() { _distributedLock?.Dispose(); @@ -165,13 +165,6 @@ public override void SetJobState(string jobId, IState state) var updates = GetOrAddJobUpdates(jobId); updates.Set[nameof(JobDto.StateName)] = state.Name; - - // if job is enqueued, we need to reset the FetchedAt value so it will be picked up by the queue - if (state is EnqueuedState) - { - updates.Set[nameof(JobDto.FetchedAt)] = BsonNull.Value; - } - updates.Pushes.Add(new BsonDocument { [nameof(JobDto.StateHistory)] = stateDto @@ -225,6 +218,7 @@ public override void AddToQueue(string queue, string jobId) { var updates = GetOrAddJobUpdates(jobId); updates.Set[nameof(JobDto.Queue)] = queue; + updates.Set[nameof(JobDto.FetchedAt)] = BsonNull.Value; JobsAddedToQueue.Add(queue); } @@ -498,7 +492,6 @@ protected virtual void ExecuteCommit( Logger.ErrorException("MongoWriteOnlyTransaction failed", e); throw; } - } protected virtual void Log(IList> writeModels, long elapsedMilliseconds)