diff --git a/ChangeLog.md b/ChangeLog.md index 55610de..151d01f 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,6 +1,13 @@ ## Change log +### v1.10.0 +- Update to Hangfire 1.8.9 +- Use SlidingInvisibilityTimeout to determine whether a background job is still alive +- BREAKING: Remove InvisibilityTimeout +- Use server time for distributed lock instead of Datetime.UtcNow +- Fixed Job not requeued when requeued while in processing state (#380) + ### v1.9.16 - Update to Hangfire 1.8.7 - Update to MongoDB 2.23.1 diff --git a/src/Hangfire.Mongo.Sample.ASPNetCore/Controllers/HomeController.cs b/src/Hangfire.Mongo.Sample.ASPNetCore/Controllers/HomeController.cs index c4d49bf..34fe772 100755 --- a/src/Hangfire.Mongo.Sample.ASPNetCore/Controllers/HomeController.cs +++ b/src/Hangfire.Mongo.Sample.ASPNetCore/Controllers/HomeController.cs @@ -90,7 +90,7 @@ public ActionResult Delayed(int id) public ActionResult Recurring() { RecurringJob.AddOrUpdate("recurring-job", - () => PrintToDebug($@"Hangfire recurring task started - {Guid.NewGuid()}"), Cron.Minutely); + () => Recurring($@"Hangfire recurring task started - {Guid.NewGuid()}"), Cron.Minutely); return RedirectToAction("Index"); } @@ -99,5 +99,11 @@ public static void PrintToDebug(string message) { Debug.WriteLine(message); } + + public static void Recurring(string message) + { + Thread.Sleep(15000); + Debug.WriteLine(message); + } } } \ No newline at end of file diff --git a/src/Hangfire.Mongo.Sample.ASPNetCore/Hangfire.Mongo.Sample.ASPNetCore.csproj b/src/Hangfire.Mongo.Sample.ASPNetCore/Hangfire.Mongo.Sample.ASPNetCore.csproj index 2e98649..eca7e19 100644 --- a/src/Hangfire.Mongo.Sample.ASPNetCore/Hangfire.Mongo.Sample.ASPNetCore.csproj +++ b/src/Hangfire.Mongo.Sample.ASPNetCore/Hangfire.Mongo.Sample.ASPNetCore.csproj @@ -2,7 +2,7 @@ net6.0 - 1.9.16 + 1.10.0 MongoDB storage implementation for Hangfire (background job system for ASP.NET applications). Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau Sergey Zwezdin, Martin Lobger, Jonas Gottschau @@ -21,8 +21,8 @@ - - + + diff --git a/src/Hangfire.Mongo.Sample.ASPNetCore/Startup.cs b/src/Hangfire.Mongo.Sample.ASPNetCore/Startup.cs index cc2f3bc..d0ecfbc 100755 --- a/src/Hangfire.Mongo.Sample.ASPNetCore/Startup.cs +++ b/src/Hangfire.Mongo.Sample.ASPNetCore/Startup.cs @@ -1,4 +1,3 @@ -using System; using Hangfire.Mongo.Migration.Strategies; using Hangfire.Mongo.Migration.Strategies.Backup; using Microsoft.AspNetCore.Builder; @@ -51,7 +50,6 @@ public void ConfigureServices(IServiceCollection services) BackupStrategy = new CollectionMongoBackupStrategy() }, CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.Watch, - InvisibilityTimeout = TimeSpan.FromMinutes(5) }; //config.UseLogProvider(new FileLogProvider()); diff --git a/src/Hangfire.Mongo.Sample.CosmosDB/Hangfire.Mongo.Sample.CosmosDB.csproj b/src/Hangfire.Mongo.Sample.CosmosDB/Hangfire.Mongo.Sample.CosmosDB.csproj index 1cbc1a6..5751e75 100644 --- a/src/Hangfire.Mongo.Sample.CosmosDB/Hangfire.Mongo.Sample.CosmosDB.csproj +++ b/src/Hangfire.Mongo.Sample.CosmosDB/Hangfire.Mongo.Sample.CosmosDB.csproj @@ -2,7 +2,7 @@ net6.0 - 1.9.16 + 1.10.0 MongoDB storage implementation for Hangfire (background job system for ASP.NET applications). Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau Sergey Zwezdin, Martin Lobger, Jonas Gottschau @@ -21,8 +21,8 @@ - - + + diff --git a/src/Hangfire.Mongo.Sample.NETCore/Hangfire.Mongo.Sample.NETCore.csproj b/src/Hangfire.Mongo.Sample.NETCore/Hangfire.Mongo.Sample.NETCore.csproj index f8919b3..6601ae2 100644 --- a/src/Hangfire.Mongo.Sample.NETCore/Hangfire.Mongo.Sample.NETCore.csproj +++ b/src/Hangfire.Mongo.Sample.NETCore/Hangfire.Mongo.Sample.NETCore.csproj @@ -4,7 +4,7 @@ Hangfire.Mongo.Sample.NETCore Exe Hangfire.Mongo.Sample.NETCore - 1.9.16 + 1.10.0 MongoDB storage implementation for Hangfire (background job system for ASP.NET applications). Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau Sergey Zwezdin, Martin Lobger, Jonas Gottschau @@ -19,7 +19,7 @@ - + diff --git a/src/Hangfire.Mongo.Tests/Hangfire.Mongo.Tests.csproj b/src/Hangfire.Mongo.Tests/Hangfire.Mongo.Tests.csproj index b5a86d5..b09dd5e 100644 --- a/src/Hangfire.Mongo.Tests/Hangfire.Mongo.Tests.csproj +++ b/src/Hangfire.Mongo.Tests/Hangfire.Mongo.Tests.csproj @@ -35,10 +35,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - - - + diff --git a/src/Hangfire.Mongo.Tests/MongoDistributedLockFacts.cs b/src/Hangfire.Mongo.Tests/MongoDistributedLockFacts.cs index e9636da..62d44a3 100644 --- a/src/Hangfire.Mongo.Tests/MongoDistributedLockFacts.cs +++ b/src/Hangfire.Mongo.Tests/MongoDistributedLockFacts.cs @@ -129,10 +129,10 @@ public void Ctor_WaitForLock_SignaledAtLockRelease() }); t.Start(); - // Wait just a bit to make sure the above lock is acuired + // Wait just a bit to make sure the above lock is acquired Thread.Sleep(TimeSpan.FromSeconds(1)); - // Record when we try to aquire the lock + // Record when we try to acquire the lock var startTime = DateTime.UtcNow; var lock2 = new MongoDistributedLock("resource1", TimeSpan.FromSeconds(10), _database, new MongoStorageOptions()); @@ -165,7 +165,7 @@ public void Ctor_SetLockExpireAtWorks_WhenResourceIsNotLocked() }; using (lock1.AcquireLock()) { - DateTime initialExpireAt = DateTime.UtcNow; + var initialExpireAt = DateTime.UtcNow; Thread.Sleep(TimeSpan.FromSeconds(5)); var lockEntry = _database.DistributedLock.Find(filter).FirstOrDefault(); diff --git a/src/Hangfire.Mongo.Tests/MongoFetchedJobFacts.cs b/src/Hangfire.Mongo.Tests/MongoFetchedJobFacts.cs index 49fd60d..235a1f7 100644 --- a/src/Hangfire.Mongo.Tests/MongoFetchedJobFacts.cs +++ b/src/Hangfire.Mongo.Tests/MongoFetchedJobFacts.cs @@ -1,5 +1,6 @@ using System; using System.Linq; +using System.Threading; using Hangfire.Mongo.Database; using Hangfire.Mongo.Dto; using Hangfire.Mongo.Tests.Utils; @@ -132,6 +133,27 @@ public void Dispose_SetsFetchedAtValueToNull_IfThereWereNoCallsToComplete() _dbContext.JobGraph.Find(new BsonDocument("_t", nameof(JobDto))).ToList().Single()); Assert.Null(record.FetchedAt); } + + [Fact] + public void Heartbeat_LonRunningJob_UpdatesFetchedAt() + { + // Arrange + // time out job after 1s + var options = new MongoStorageOptions() {SlidingInvisibilityTimeout = TimeSpan.FromSeconds(1)}; + var queue = "default"; + var jobId = ObjectId.GenerateNewId(); + var id = CreateJobQueueRecord(_dbContext, jobId, queue, _fetchedAt); + var initialFetchedAt = DateTime.UtcNow; + + // Act + var job = new MongoFetchedJob(_dbContext, options, initialFetchedAt, id, jobId, queue); + // job runs for 2s, Heartbeat updates job + Thread.Sleep(TimeSpan.FromSeconds(2)); + job.Dispose(); + + // Assert + Assert.True(job.FetchedAt > initialFetchedAt, "Expected job FetchedAt field to be updated"); + } private ObjectId CreateJobQueueRecord(HangfireDbContext connection, ObjectId jobId, string queue, DateTime? fetchedAt) { diff --git a/src/Hangfire.Mongo.Tests/MongoJobQueueFacts.cs b/src/Hangfire.Mongo.Tests/MongoJobQueueFacts.cs index 8f358f0..6b86fda 100644 --- a/src/Hangfire.Mongo.Tests/MongoJobQueueFacts.cs +++ b/src/Hangfire.Mongo.Tests/MongoJobQueueFacts.cs @@ -163,7 +163,7 @@ public void Dequeue_ShouldFetchATimedOutJobs_FromTheSpecifiedQueue() var options = new MongoStorageOptions { - InvisibilityTimeout = TimeSpan.FromMinutes(30) + SlidingInvisibilityTimeout = TimeSpan.FromMinutes(30) }; _jobQueueSemaphoreMock.WaitNonBlock("default").Returns(true); @@ -191,7 +191,10 @@ public void Dequeue_NoInvisibilityTimeout_WaitsForever() }; _hangfireDbContext.JobGraph.InsertOne(job.Serialize()); - var options = new MongoStorageOptions(); + var options = new MongoStorageOptions + { + SlidingInvisibilityTimeout = null + }; _jobQueueSemaphoreMock.WaitNonBlock("default").Returns(true); var queue =new MongoJobFetcher(_hangfireDbContext, options, _jobQueueSemaphoreMock); diff --git a/src/Hangfire.Mongo.Tests/MongoStorageOptionsFacts.cs b/src/Hangfire.Mongo.Tests/MongoStorageOptionsFacts.cs index 0aba6a0..9390967 100644 --- a/src/Hangfire.Mongo.Tests/MongoStorageOptionsFacts.cs +++ b/src/Hangfire.Mongo.Tests/MongoStorageOptionsFacts.cs @@ -15,7 +15,7 @@ public void Ctor_SetsTheDefaultOptions() MongoStorageOptions storageOptions = new MongoStorageOptions(); Assert.Equal("hangfire", storageOptions.Prefix); - Assert.Null(storageOptions.InvisibilityTimeout); + Assert.NotNull(storageOptions.SlidingInvisibilityTimeout); } [Fact] diff --git a/src/Hangfire.Mongo/Database/HangfireDbContext.cs b/src/Hangfire.Mongo/Database/HangfireDbContext.cs index d15ac2c..6e9fdd1 100644 --- a/src/Hangfire.Mongo/Database/HangfireDbContext.cs +++ b/src/Hangfire.Mongo/Database/HangfireDbContext.cs @@ -21,7 +21,7 @@ public sealed class HangfireDbContext /// Database instance used for this db context instance /// public IMongoDatabase Database { get; } - + internal HangfireDbContext(string connectionString, string databaseName, string prefix = "hangfire") :this(new MongoClient(connectionString), databaseName, prefix) { @@ -41,7 +41,6 @@ public HangfireDbContext(IMongoClient mongoClient, string databaseName, string p ConnectionId = Guid.NewGuid().ToString(); } - /// /// Mongo database connection identifier /// diff --git a/src/Hangfire.Mongo/DistributedLock/MongoDistributedLock.cs b/src/Hangfire.Mongo/DistributedLock/MongoDistributedLock.cs index 914ea72..addab25 100644 --- a/src/Hangfire.Mongo/DistributedLock/MongoDistributedLock.cs +++ b/src/Hangfire.Mongo/DistributedLock/MongoDistributedLock.cs @@ -1,5 +1,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using System.Text; using System.Threading; using Hangfire.Logging; using Hangfire.Mongo.Database; @@ -112,11 +114,8 @@ public virtual void Dispose() { AcquiredLocks.Value.Remove(_resource); - if (_heartbeatTimer != null) - { - _heartbeatTimer.Dispose(); - _heartbeatTimer = null; - } + _heartbeatTimer?.Dispose(); + _heartbeatTimer = null; Release(); @@ -263,13 +262,15 @@ protected virtual void Cleanup() /// protected virtual void StartHeartBeat() { - TimeSpan timerInterval = TimeSpan.FromMilliseconds(_storageOptions.DistributedLockLifetime.TotalMilliseconds / 5); - _heartbeatTimer = new Timer(state => + var timerInterval = TimeSpan.FromMilliseconds(_storageOptions.DistributedLockLifetime.TotalMilliseconds / 5); + _heartbeatTimer = new Timer(_ => { // Timer callback may be invoked after the Dispose method call, // so we are using lock to avoid un synchronized calls. lock (_lockObject) { + if (_completed) return; + try { var filter = new BsonDocument @@ -278,12 +279,43 @@ protected virtual void StartHeartBeat() }; var update = new BsonDocument { - ["$set"] = new BsonDocument + [nameof(DistributedLockDto.ExpireAt)] = new BsonDocument { - [nameof(DistributedLockDto.ExpireAt)] = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime) + ["$add"] = new BsonArray + { + "$$NOW", + (int) _storageOptions.DistributedLockLifetime.TotalMilliseconds, + } } }; - _dbContext.DistributedLock.FindOneAndUpdate(filter, update); + + var pipeline = new BsonDocument[] + { + new BsonDocument("$match", filter), + new BsonDocument("$set", update) + }; + Stopwatch sw = null; + if (Logger.IsTraceEnabled()) + { + sw = Stopwatch.StartNew(); + } + + _dbContext.DistributedLock.Aggregate(pipeline).FirstOrDefault(); + + if (Logger.IsTraceEnabled() && sw != null) + { + var serializedModel = new Dictionary + { + ["Filter"] = filter, + ["Update"] = update + }; + sw.Stop(); + var builder = new StringBuilder(); + builder.AppendLine($"Lock heartbeat"); + builder.AppendLine($"{serializedModel.ToJson()}"); + builder.AppendLine($"Executed in {sw.ElapsedMilliseconds} ms"); + Logger.Trace($"{builder}"); + } } catch (Exception ex) { diff --git a/src/Hangfire.Mongo/Hangfire.Mongo.csproj b/src/Hangfire.Mongo/Hangfire.Mongo.csproj index 7e99bc5..353674c 100644 --- a/src/Hangfire.Mongo/Hangfire.Mongo.csproj +++ b/src/Hangfire.Mongo/Hangfire.Mongo.csproj @@ -1,6 +1,6 @@ - 1.9.16 + 1.10.0 netstandard2.0 $(NoWarn);CS0618 true @@ -21,9 +21,12 @@ Sergey Zwezdin, Jonas Gottschau MongoDB storage implementation for Hangfire (background job system for ASP.NET applications). Hangfire AspNet OWIN MongoDB CosmosDB Long-Running Background Fire-And-Forget Delayed Recurring Tasks Jobs Scheduler Threading Queues - 1.9.16 - - Update to Hangfire 1.8.7 - - Update to MongoDB 2.23.1 + 1.10.0 + - Update to Hangfire 1.8.9 + - Use SlidingInvisibilityTimeout to determine whether a background job is still alive + - BREAKING: Remove InvisibilityTimeout + - Use server time for distributed lock instead of Datetime.UtcNow + - Fixed Job not requeued when requeued while in processing state (#380) README.md @@ -35,7 +38,7 @@ - + diff --git a/src/Hangfire.Mongo/MongoConnection.cs b/src/Hangfire.Mongo/MongoConnection.cs index 6ea7888..00d8abf 100644 --- a/src/Hangfire.Mongo/MongoConnection.cs +++ b/src/Hangfire.Mongo/MongoConnection.cs @@ -696,7 +696,7 @@ public override DateTime GetUtcDateTime() if (Logger.IsTraceEnabled()) { - Logger.Trace($"GetUtcDateTime() => {now}"); + Logger.Trace($"GetUtcDateTime() => {now:O}"); } return now; diff --git a/src/Hangfire.Mongo/MongoFetchedJob.cs b/src/Hangfire.Mongo/MongoFetchedJob.cs index b6271af..133c198 100644 --- a/src/Hangfire.Mongo/MongoFetchedJob.cs +++ b/src/Hangfire.Mongo/MongoFetchedJob.cs @@ -1,7 +1,14 @@ using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using System.Threading; +using Hangfire.Logging; using Hangfire.Mongo.Database; +using Hangfire.Mongo.Dto; using Hangfire.Storage; using MongoDB.Bson; +using MongoDB.Driver; namespace Hangfire.Mongo { @@ -10,16 +17,19 @@ namespace Hangfire.Mongo /// public class MongoFetchedJob : IFetchedJob { + private static readonly ILog Logger = LogProvider.For(); + private readonly HangfireDbContext _db; private readonly MongoStorageOptions _storageOptions; - private readonly DateTime _fetchedAt; + private DateTime _fetchedAt; private readonly ObjectId _id; + private readonly object _syncRoot; private bool _disposed; - private bool _removedFromQueue; - private bool _requeued; + private Timer _heartbeatTimer; + /// /// Constructs fetched job by database connection, identifier, job ID and queue @@ -31,19 +41,24 @@ public class MongoFetchedJob : IFetchedJob /// Job ID /// Queue name public MongoFetchedJob( - HangfireDbContext db, - MongoStorageOptions storageOptions, + HangfireDbContext db, + MongoStorageOptions storageOptions, DateTime fetchedAt, - ObjectId id, - ObjectId jobId, + ObjectId id, + ObjectId jobId, string queue) { _db = db ?? throw new ArgumentNullException(nameof(db)); + _syncRoot = new object(); _storageOptions = storageOptions; _fetchedAt = fetchedAt; _id = id; JobId = jobId.ToString(); Queue = queue ?? throw new ArgumentNullException(nameof(queue)); + if (storageOptions.SlidingInvisibilityTimeout.HasValue) + { + StartHeartbeat(storageOptions.SlidingInvisibilityTimeout.Value); + } } /// @@ -71,11 +86,9 @@ public MongoFetchedJob( /// public virtual void RemoveFromQueue() { - using (var transaction = _storageOptions.Factory.CreateMongoWriteOnlyTransaction(_db, _storageOptions)) - { - transaction.RemoveFromQueue(_id, _fetchedAt, Queue); - transaction.Commit(); - } + using var t = _storageOptions.Factory.CreateMongoWriteOnlyTransaction(_db, _storageOptions); + t.RemoveFromQueue(_id, _fetchedAt, Queue); + t.Commit(); SetRemoved(); } @@ -92,11 +105,9 @@ public virtual void SetRemoved() /// public virtual void Requeue() { - using (var transaction = _storageOptions.Factory.CreateMongoWriteOnlyTransaction(_db, _storageOptions)) - { - transaction.Requeue(_id, Queue); - transaction.Commit(); - } + using var t = _storageOptions.Factory.CreateMongoWriteOnlyTransaction(_db, _storageOptions); + t.Requeue(_id, Queue); + t.Commit(); _requeued = true; } @@ -105,13 +116,90 @@ public virtual void Requeue() /// public virtual void Dispose() { - if (_disposed) return; + lock (_syncRoot) + { + if (_disposed) return; + + _heartbeatTimer?.Dispose(); + _heartbeatTimer = null; + + _disposed = true; + } + if (!_removedFromQueue && !_requeued) { + // will create a new instance of MongoFetchedJob Requeue(); } - - _disposed = true; + } + + private void StartHeartbeat(TimeSpan slidingInvisibilityTimeout) + { + var timerInterval = TimeSpan.FromSeconds(slidingInvisibilityTimeout.TotalSeconds / 5); + var filter = new BsonDocument("_id", Id); + var update = new BsonDocument + { + ["$currentDate"] = new BsonDocument + { + [nameof(JobDto.FetchedAt)] = true + } + }; + var options = new FindOneAndUpdateOptions + { + IsUpsert = false, + ReturnDocument = ReturnDocument.After, + Projection = new BsonDocument + { + [nameof(JobDto.FetchedAt)] = 1 + } + }; + _heartbeatTimer = new Timer(_ => + { + // Timer callback may be invoked after the Dispose method call, + // so we are using lock to avoid un synchronized calls. + lock (_syncRoot) + { + if (_disposed) + { + return; + } + + if (_requeued || _removedFromQueue) + { + return; + } + + Stopwatch sw = null; + if (Logger.IsTraceEnabled()) + { + sw = Stopwatch.StartNew(); + } + + try + { + var result = _db.JobGraph.FindOneAndUpdate(filter, update, options); + _fetchedAt = result[nameof(JobDto.FetchedAt)].ToUniversalTime(); + if (Logger.IsTraceEnabled() && sw != null) + { + var serializedModel = new Dictionary + { + ["Filter"] = filter, + ["Update"] = update + }; + sw.Stop(); + var builder = new StringBuilder(); + builder.AppendLine($"Job heartbeat"); + builder.AppendLine($"{serializedModel.ToJson()}"); + builder.AppendLine($"Executed in {sw.ElapsedMilliseconds} ms"); + Logger.Trace($"{builder}"); + } + } + catch (Exception ex) + { + Logger.Error($"Job: {Id} - Unable to update heartbeat. Details:\r\n{ex}"); + } + } + }, null, timerInterval, timerInterval); } } } \ No newline at end of file diff --git a/src/Hangfire.Mongo/MongoJobFetcher.cs b/src/Hangfire.Mongo/MongoJobFetcher.cs index 143834a..22e3b9e 100644 --- a/src/Hangfire.Mongo/MongoJobFetcher.cs +++ b/src/Hangfire.Mongo/MongoJobFetcher.cs @@ -43,7 +43,7 @@ public MongoJobFetcher(HangfireDbContext dbContext, MongoStorageOptions storageO } /// - /// Fetches net job, blocks until job is successfully fetched + /// Fetches next job, blocks until job is successfully fetched /// Queues are in prioritized order /// /// @@ -127,10 +127,10 @@ public virtual MongoFetchedJob TryAllQueues(string[] queues, CancellationToken c public virtual MongoFetchedJob TryGetEnqueuedJob(string queue, CancellationToken cancellationToken) { var fetchedAtQuery = new BsonDocument(nameof(JobDto.FetchedAt), BsonNull.Value); - if(_storageOptions.InvisibilityTimeout.HasValue) + if(_storageOptions.SlidingInvisibilityTimeout.HasValue) { var date = - DateTime.UtcNow.AddSeconds(_storageOptions.InvisibilityTimeout.Value.Negate().TotalSeconds); + DateTime.UtcNow.AddSeconds(_storageOptions.SlidingInvisibilityTimeout.Value.Negate().TotalSeconds); fetchedAtQuery = new BsonDocument("$or", new BsonArray { new BsonDocument(nameof(JobDto.FetchedAt), BsonNull.Value), diff --git a/src/Hangfire.Mongo/MongoStorageOptions.cs b/src/Hangfire.Mongo/MongoStorageOptions.cs index 3f44c25..5b8e9bd 100644 --- a/src/Hangfire.Mongo/MongoStorageOptions.cs +++ b/src/Hangfire.Mongo/MongoStorageOptions.cs @@ -1,5 +1,4 @@ using System; -using MongoDB.Driver; namespace Hangfire.Mongo { @@ -24,7 +23,7 @@ public MongoStorageOptions() { Prefix = "hangfire"; QueuePollInterval = TimeSpan.FromSeconds(15); - InvisibilityTimeout = null; + SlidingInvisibilityTimeout = TimeSpan.FromMinutes(5); DistributedLockLifetime = TimeSpan.FromSeconds(30); JobExpirationCheckInterval = TimeSpan.FromHours(1); CountersAggregateInterval = TimeSpan.FromMinutes(5); @@ -108,9 +107,11 @@ public TimeSpan QueuePollInterval public bool SupportsCappedCollection { get; set; } = true; /// - /// Invisibility timeout + /// If 'SlidingInvisibilityTimeout' a has value, Hangfire.Mongo will periodically update a jobs timestamp. + /// 'SlidingInvisibilityTimeout' determines how long time before Hangfire.Mongo decides the job is abandoned + /// default = 5 min, if set to null, jobs will never be abandoned /// - public TimeSpan? InvisibilityTimeout { get; set; } + public TimeSpan? SlidingInvisibilityTimeout { get; set; } /// /// Lifetime of distributed lock diff --git a/src/Hangfire.Mongo/MongoWriteOnlyTransaction.cs b/src/Hangfire.Mongo/MongoWriteOnlyTransaction.cs index 0ceef7e..0ce36bb 100644 --- a/src/Hangfire.Mongo/MongoWriteOnlyTransaction.cs +++ b/src/Hangfire.Mongo/MongoWriteOnlyTransaction.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using Hangfire.Annotations; @@ -164,6 +165,13 @@ public override void SetJobState(string jobId, IState state) var updates = GetOrAddJobUpdates(jobId); updates.Set[nameof(JobDto.StateName)] = state.Name; + + // if job is enqueued, we need to reset the FetchedAt value so it will be picked up by the queue + if (state is EnqueuedState) + { + updates.Set[nameof(JobDto.FetchedAt)] = BsonNull.Value; + } + updates.Pushes.Add(new BsonDocument { [nameof(JobDto.StateHistory)] = stateDto @@ -444,8 +452,6 @@ public override void Commit() return; } - Log(_writeModels); - var jobGraph = DbContext .Database .GetCollection(DbContext.JobGraph.CollectionNamespace.CollectionName); @@ -455,9 +461,20 @@ public override void Commit() IsOrdered = true, BypassDocumentValidation = false }; - + + Stopwatch sw = null; + if (Logger.IsTraceEnabled()) + { + sw = Stopwatch.StartNew(); + } + ExecuteCommit(jobGraph, _writeModels, bulkWriteOptions); + if (Logger.IsTraceEnabled() && sw != null) + { + Log(_writeModels, sw.ElapsedMilliseconds); + } + _removedJobs.ForEach(j => j.SetRemoved()); _distributedLock?.Dispose(); @@ -475,21 +492,21 @@ protected virtual void ExecuteCommit( jobGraph.BulkWrite(writeModels, bulkWriteOptions); } - protected virtual void Log(IList> writeModels) + protected virtual void Log(IList> writeModels, long elapsedMilliseconds) { - if (!Logger.IsTraceEnabled()) - { - return; - } - var builder = new StringBuilder(); + builder.AppendLine($"Commit (bulk write)"); + foreach (var writeModel in writeModels) { var serializedModel = SerializeWriteModel(writeModel); - builder.AppendLine($"{writeModel.ModelType}={serializedModel}"); + + builder.AppendLine($"{writeModel.ModelType}:"); + builder.AppendLine($"{serializedModel}"); } - Logger.Trace($"BulkWrite:\r\n{builder}"); + builder.AppendLine($"Executed in {elapsedMilliseconds} ms"); + Logger.Trace($"{builder}"); } public virtual void SignalJobsAddedToQueues(ICollection queues)