Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding timeout and fix jobs not enqueued when requeued while processing (#380) #381

Merged
merged 6 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@

## Change log

### v1.10.0
- Update to Hangfire 1.8.9
- Use SlidingInvisibilityTimeout to determine whether a background job is still alive
- BREAKING: Remove InvisibilityTimeout
- Use server time for distributed lock instead of Datetime.UtcNow
- Fixed Job not requeued when requeued while in processing state (#380)

### v1.9.16
- Update to Hangfire 1.8.7
- Update to MongoDB 2.23.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public ActionResult Delayed(int id)
public ActionResult Recurring()
{
RecurringJob.AddOrUpdate("recurring-job",
() => PrintToDebug($@"Hangfire recurring task started - {Guid.NewGuid()}"), Cron.Minutely);
() => Recurring($@"Hangfire recurring task started - {Guid.NewGuid()}"), Cron.Minutely);

return RedirectToAction("Index");
}
Expand All @@ -99,5 +99,11 @@ public static void PrintToDebug(string message)
{
Debug.WriteLine(message);
}

public static void Recurring(string message)
{
Thread.Sleep(15000);
Debug.WriteLine(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Description />
<Version>1.9.16</Version>
<Version>1.10.0</Version>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<Copyright>Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Copyright>
<Authors>Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Authors>
Expand All @@ -21,8 +21,8 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="bootstrap" Version="5.3.2" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.7" />
<PackageReference Include="Hangfire.Core" Version="1.8.7" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.9" />
<PackageReference Include="Hangfire.Core" Version="1.8.9" />
<PackageReference Include="jquery" Version="3.7.1" />
<PackageReference Include="Mongo2Go" Version="3.1.3" />
<PackageReference Include="MongoDB.Driver" Version="2.23.1" />
Expand Down
2 changes: 0 additions & 2 deletions src/Hangfire.Mongo.Sample.ASPNetCore/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System;
using Hangfire.Mongo.Migration.Strategies;
using Hangfire.Mongo.Migration.Strategies.Backup;
using Microsoft.AspNetCore.Builder;
Expand Down Expand Up @@ -51,7 +50,6 @@ public void ConfigureServices(IServiceCollection services)
BackupStrategy = new CollectionMongoBackupStrategy()
},
CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.Watch,
InvisibilityTimeout = TimeSpan.FromMinutes(5)
};

//config.UseLogProvider(new FileLogProvider());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Description />
<Version>1.9.16</Version>
<Version>1.10.0</Version>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<Copyright>Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Copyright>
<Authors>Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Authors>
Expand All @@ -21,8 +21,8 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="bootstrap" Version="5.3.2" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.7" />
<PackageReference Include="Hangfire.Core" Version="1.8.7" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.9" />
<PackageReference Include="Hangfire.Core" Version="1.8.9" />
<PackageReference Include="jquery" Version="3.7.1" />
<PackageReference Include="MongoDB.Driver" Version="2.23.1" />
<PackageReference Include="Microsoft.VisualStudio.Web.BrowserLink" Version="2.2.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<AssemblyName>Hangfire.Mongo.Sample.NETCore</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Hangfire.Mongo.Sample.NETCore</PackageId>
<Version>1.9.16</Version>
<Version>1.10.0</Version>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<Copyright>Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Copyright>
<Authors>Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Authors>
Expand All @@ -19,7 +19,7 @@
<ProjectReference Include="..\Hangfire.Mongo\Hangfire.Mongo.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Hangfire.Core" Version="1.8.7" />
<PackageReference Include="Hangfire.Core" Version="1.8.9" />
<PackageReference Include="Mongo2Go" Version="3.1.3" />
<PackageReference Include="MongoDB.Driver" Version="2.23.1" />
</ItemGroup>
Expand Down
5 changes: 1 addition & 4 deletions src/Hangfire.Mongo.Tests/Hangfire.Mongo.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="xunit" Version="2.6.4" />
</ItemGroup>
<ItemGroup>
<Reference Include="System.IO.Compression" />
<PackageReference Include="xunit" Version="2.6.6" />
</ItemGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
Expand Down
6 changes: 3 additions & 3 deletions src/Hangfire.Mongo.Tests/MongoDistributedLockFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ public void Ctor_WaitForLock_SignaledAtLockRelease()
});
t.Start();

// Wait just a bit to make sure the above lock is acuired
// Wait just a bit to make sure the above lock is acquired
Thread.Sleep(TimeSpan.FromSeconds(1));

// Record when we try to aquire the lock
// Record when we try to acquire the lock
var startTime = DateTime.UtcNow;
var lock2 = new MongoDistributedLock("resource1", TimeSpan.FromSeconds(10), _database,
new MongoStorageOptions());
Expand Down Expand Up @@ -165,7 +165,7 @@ public void Ctor_SetLockExpireAtWorks_WhenResourceIsNotLocked()
};
using (lock1.AcquireLock())
{
DateTime initialExpireAt = DateTime.UtcNow;
var initialExpireAt = DateTime.UtcNow;
Thread.Sleep(TimeSpan.FromSeconds(5));

var lockEntry = _database.DistributedLock.Find(filter).FirstOrDefault();
Expand Down
22 changes: 22 additions & 0 deletions src/Hangfire.Mongo.Tests/MongoFetchedJobFacts.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Threading;
using Hangfire.Mongo.Database;
using Hangfire.Mongo.Dto;
using Hangfire.Mongo.Tests.Utils;
Expand Down Expand Up @@ -132,6 +133,27 @@ public void Dispose_SetsFetchedAtValueToNull_IfThereWereNoCallsToComplete()
_dbContext.JobGraph.Find(new BsonDocument("_t", nameof(JobDto))).ToList().Single());
Assert.Null(record.FetchedAt);
}

[Fact]
public void Heartbeat_LonRunningJob_UpdatesFetchedAt()
{
// Arrange
// time out job after 1s
var options = new MongoStorageOptions() {SlidingInvisibilityTimeout = TimeSpan.FromSeconds(1)};
var queue = "default";
var jobId = ObjectId.GenerateNewId();
var id = CreateJobQueueRecord(_dbContext, jobId, queue, _fetchedAt);
var initialFetchedAt = DateTime.UtcNow;

// Act
var job = new MongoFetchedJob(_dbContext, options, initialFetchedAt, id, jobId, queue);
// job runs for 2s, Heartbeat updates job
Thread.Sleep(TimeSpan.FromSeconds(2));
job.Dispose();

// Assert
Assert.True(job.FetchedAt > initialFetchedAt, "Expected job FetchedAt field to be updated");
}

private ObjectId CreateJobQueueRecord(HangfireDbContext connection, ObjectId jobId, string queue, DateTime? fetchedAt)
{
Expand Down
7 changes: 5 additions & 2 deletions src/Hangfire.Mongo.Tests/MongoJobQueueFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void Dequeue_ShouldFetchATimedOutJobs_FromTheSpecifiedQueue()

var options = new MongoStorageOptions
{
InvisibilityTimeout = TimeSpan.FromMinutes(30)
SlidingInvisibilityTimeout = TimeSpan.FromMinutes(30)
};

_jobQueueSemaphoreMock.WaitNonBlock("default").Returns(true);
Expand Down Expand Up @@ -191,7 +191,10 @@ public void Dequeue_NoInvisibilityTimeout_WaitsForever()
};
_hangfireDbContext.JobGraph.InsertOne(job.Serialize());

var options = new MongoStorageOptions();
var options = new MongoStorageOptions
{
SlidingInvisibilityTimeout = null
};

_jobQueueSemaphoreMock.WaitNonBlock("default").Returns(true);
var queue =new MongoJobFetcher(_hangfireDbContext, options, _jobQueueSemaphoreMock);
Expand Down
2 changes: 1 addition & 1 deletion src/Hangfire.Mongo.Tests/MongoStorageOptionsFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public void Ctor_SetsTheDefaultOptions()
MongoStorageOptions storageOptions = new MongoStorageOptions();

Assert.Equal("hangfire", storageOptions.Prefix);
Assert.Null(storageOptions.InvisibilityTimeout);
Assert.NotNull(storageOptions.SlidingInvisibilityTimeout);
}

[Fact]
Expand Down
3 changes: 1 addition & 2 deletions src/Hangfire.Mongo/Database/HangfireDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public sealed class HangfireDbContext
/// Database instance used for this db context instance
/// </summary>
public IMongoDatabase Database { get; }

internal HangfireDbContext(string connectionString, string databaseName, string prefix = "hangfire")
:this(new MongoClient(connectionString), databaseName, prefix)
{
Expand All @@ -41,7 +41,6 @@ public HangfireDbContext(IMongoClient mongoClient, string databaseName, string p
ConnectionId = Guid.NewGuid().ToString();
}


/// <summary>
/// Mongo database connection identifier
/// </summary>
Expand Down
52 changes: 42 additions & 10 deletions src/Hangfire.Mongo/DistributedLock/MongoDistributedLock.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using Hangfire.Logging;
using Hangfire.Mongo.Database;
Expand Down Expand Up @@ -112,11 +114,8 @@ public virtual void Dispose()
{
AcquiredLocks.Value.Remove(_resource);

if (_heartbeatTimer != null)
{
_heartbeatTimer.Dispose();
_heartbeatTimer = null;
}
_heartbeatTimer?.Dispose();
_heartbeatTimer = null;

Release();

Expand Down Expand Up @@ -263,13 +262,15 @@ protected virtual void Cleanup()
/// </summary>
protected virtual void StartHeartBeat()
{
TimeSpan timerInterval = TimeSpan.FromMilliseconds(_storageOptions.DistributedLockLifetime.TotalMilliseconds / 5);
_heartbeatTimer = new Timer(state =>
var timerInterval = TimeSpan.FromMilliseconds(_storageOptions.DistributedLockLifetime.TotalMilliseconds / 5);
_heartbeatTimer = new Timer(_ =>
{
// Timer callback may be invoked after the Dispose method call,
// so we are using lock to avoid un synchronized calls.
lock (_lockObject)
{
if (_completed) return;

try
{
var filter = new BsonDocument
Expand All @@ -278,12 +279,43 @@ protected virtual void StartHeartBeat()
};
var update = new BsonDocument
{
["$set"] = new BsonDocument
[nameof(DistributedLockDto.ExpireAt)] = new BsonDocument
{
[nameof(DistributedLockDto.ExpireAt)] = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime)
["$add"] = new BsonArray
{
"$$NOW",
(int) _storageOptions.DistributedLockLifetime.TotalMilliseconds,
}
}
};
_dbContext.DistributedLock.FindOneAndUpdate(filter, update);

var pipeline = new BsonDocument[]
{
new BsonDocument("$match", filter),
new BsonDocument("$set", update)
};
Stopwatch sw = null;
if (Logger.IsTraceEnabled())
{
sw = Stopwatch.StartNew();
}

_dbContext.DistributedLock.Aggregate<BsonDocument>(pipeline).FirstOrDefault();

if (Logger.IsTraceEnabled() && sw != null)
{
var serializedModel = new Dictionary<string, BsonDocument>
{
["Filter"] = filter,
["Update"] = update
};
sw.Stop();
var builder = new StringBuilder();
builder.AppendLine($"Lock heartbeat");
builder.AppendLine($"{serializedModel.ToJson()}");
builder.AppendLine($"Executed in {sw.ElapsedMilliseconds} ms");
Logger.Trace($"{builder}");
}
}
catch (Exception ex)
{
Expand Down
13 changes: 8 additions & 5 deletions src/Hangfire.Mongo/Hangfire.Mongo.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<VersionPrefix>1.9.16</VersionPrefix>
<VersionPrefix>1.10.0</VersionPrefix>
<TargetFramework>netstandard2.0</TargetFramework>
<NoWarn>$(NoWarn);CS0618</NoWarn>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
Expand All @@ -21,9 +21,12 @@
<owners>Sergey Zwezdin, Jonas Gottschau</owners>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<PackageTags>Hangfire AspNet OWIN MongoDB CosmosDB Long-Running Background Fire-And-Forget Delayed Recurring Tasks Jobs Scheduler Threading Queues</PackageTags>
<PackageReleaseNotes>1.9.16
- Update to Hangfire 1.8.7
- Update to MongoDB 2.23.1
<PackageReleaseNotes>1.10.0
- Update to Hangfire 1.8.9
- Use SlidingInvisibilityTimeout to determine whether a background job is still alive
- BREAKING: Remove InvisibilityTimeout
- Use server time for distributed lock instead of Datetime.UtcNow
- Fixed Job not requeued when requeued while in processing state (#380)
</PackageReleaseNotes>
<PackageReadmeFile>README.md</PackageReadmeFile>
<!--<PackageLicenseUrl>https://raw.githubusercontent.com/sergun/Hangfire.Mongo/master/LICENSE</PackageLicenseUrl>-->
Expand All @@ -35,7 +38,7 @@
<None Include="../../README.md" pack="true" PackagePath="." />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Hangfire.Core" Version="1.8.7" />
<PackageReference Include="Hangfire.Core" Version="1.8.9" />
<PackageReference Include="MongoDB.Driver" Version="2.23.1" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Hangfire.Mongo/MongoConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ public override DateTime GetUtcDateTime()

if (Logger.IsTraceEnabled())
{
Logger.Trace($"GetUtcDateTime() => {now}");
Logger.Trace($"GetUtcDateTime() => {now:O}");
}

return now;
Expand Down
Loading
Loading