Skip to content

Commit

Permalink
Cosmosdb (#363)
Browse files Browse the repository at this point in the history
* fix cosmosdb requires all unique indexes to have fields

#362

* update version

* add job queue watch

add retry for bulkwrites

* minor optimization
  • Loading branch information
gottscj authored Sep 21, 2023
1 parent 6e589cb commit 08be7d0
Show file tree
Hide file tree
Showing 14 changed files with 396 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Description />
<Version>1.9.9</Version>
<Version>1.9.10</Version>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<Copyright>Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Copyright>
<Authors>Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Description />
<Version>1.9.9</Version>
<Version>1.9.10</Version>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<Copyright>Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Copyright>
<Authors>Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<AssemblyName>Hangfire.Mongo.Sample.NETCore</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Hangfire.Mongo.Sample.NETCore</PackageId>
<Version>1.9.9</Version>
<Version>1.9.10</Version>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<Copyright>Copyright © 2014-2019 Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Copyright>
<Authors>Sergey Zwezdin, Martin Lobger, Jonas Gottschau</Authors>
Expand Down
20 changes: 20 additions & 0 deletions src/Hangfire.Mongo/CosmosDB/CosmosConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using Hangfire.Mongo.Database;

namespace Hangfire.Mongo.CosmosDB;

/// <inheritdoc />
public class CosmosConnection : MongoConnection
{
/// <inheritdoc />
public CosmosConnection(HangfireDbContext database, MongoStorageOptions storageOptions)
: base(database, storageOptions)
{
}

/// <inheritdoc />
public override DateTime GetUtcDateTime()
{
return DateTime.UtcNow;
}
}
63 changes: 63 additions & 0 deletions src/Hangfire.Mongo/CosmosDB/CosmosDbWriteOnlyTransaction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Hangfire.Common;
using Hangfire.Logging;
using Hangfire.Mongo.Database;
using Hangfire.Mongo.Dto;
using Hangfire.Storage;
using MongoDB.Bson;
using MongoDB.Driver;

namespace Hangfire.Mongo.CosmosDB;

#pragma warning disable 1591

public class CosmosDbWriteOnlyTransaction : MongoWriteOnlyTransaction
{
public CosmosDbWriteOnlyTransaction(HangfireDbContext dbContext, MongoStorageOptions storageOptions) : base(
dbContext, storageOptions)
{
}

/// <summary>
/// check if we are inserting a Job, add the "Key" field as this is required in cosmos as there is
/// unique index for this field
/// </summary>
/// <param name="jobGraph"></param>
/// <param name="writeModels"></param>
/// <param name="bulkWriteOptions"></param>
protected override void ExecuteCommit(IMongoCollection<BsonDocument> jobGraph, List<WriteModel<BsonDocument>> writeModels, BulkWriteOptions bulkWriteOptions)
{
foreach (var insertOneModel in writeModels.OfType<InsertOneModel<BsonDocument>>())
{
var typeArray = insertOneModel.Document["_t"].AsBsonArray;
if (typeArray.Contains("JobDto"))
{
insertOneModel.Document[nameof(KeyJobDto.Key)] = insertOneModel.Document["_id"].ToString();
}
}

var trys = 3;
do
{
try
{
base.ExecuteCommit(jobGraph, writeModels, bulkWriteOptions);
break;
}
catch (Exception e)
{
trys -= 1;
Logger.WarnException("Error writing to DB", e);
if (trys == 0)
{
Logger.ErrorException("Throwing after 3 re-trys", e);
throw;
}
}
} while (true);
}
}

#pragma warning restore 1591
26 changes: 26 additions & 0 deletions src/Hangfire.Mongo/CosmosDB/CosmosFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Hangfire.Mongo.Database;

namespace Hangfire.Mongo.CosmosDB;

/// <inheritdoc />
public class CosmosFactory : MongoFactory
{
/// <inheritdoc />
public override MongoWriteOnlyTransaction CreateMongoWriteOnlyTransaction(HangfireDbContext dbContext,
MongoStorageOptions storageOptions)
{
return new CosmosDbWriteOnlyTransaction(dbContext, storageOptions);
}

/// <inheritdoc />
public override MongoConnection CreateMongoConnection(HangfireDbContext dbContext, MongoStorageOptions storageOptions)
{
return new CosmosConnection(dbContext, storageOptions);
}

/// <inheritdoc />
public override MongoJobQueueWatcher CreateMongoJobQueueWatcher(HangfireDbContext dbContext, MongoStorageOptions storageOptions)
{
return new CosmosQueueWatcher(dbContext, storageOptions, JobQueueSemaphore);
}
}
120 changes: 120 additions & 0 deletions src/Hangfire.Mongo/CosmosDB/CosmosQueueWatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
using System;
using System.Threading;
using Hangfire.Logging;
using Hangfire.Mongo.Database;
using Hangfire.Mongo.Dto;
using Hangfire.States;
using MongoDB.Bson;
using MongoDB.Driver;

namespace Hangfire.Mongo.CosmosDB;

/// <summary>
///
/// </summary>
public class CosmosQueueWatcher : MongoJobQueueWatcher
{

/// <summary>
/// ctor
/// </summary>
/// <param name="dbContext"></param>
/// <param name="storageOptions"></param>
/// <param name="jobQueueSemaphore"></param>
public CosmosQueueWatcher(HangfireDbContext dbContext,
MongoStorageOptions storageOptions,
IJobQueueSemaphore jobQueueSemaphore)
: base(dbContext, storageOptions, jobQueueSemaphore)
{
}

/// <inheritdoc />
public override void Execute(CancellationToken cancellationToken)
{
Logger.Warn("Be careful using Watch. Its not thoroughly tested!");
var pipeline = new[]
{
new BsonDocument
{
["$match"] = new BsonDocument
{
["operationType"] = new BsonDocument
{
["$in"] = new BsonArray
{
"insert", "update", "replace"
}
}
}
},
new BsonDocument
{
["$project"] = new BsonDocument
{
["_id"] = 1,
["fullDocument"] = 1,
["ns"] = 1,
["documentKey"] = 1
}
}
};

while (!cancellationToken.IsCancellationRequested)
{
try
{
var cursor = DbContext
.Database
.GetCollection<BsonDocument>(DbContext.JobGraph.CollectionNamespace.CollectionName)
.Watch<BsonDocument>(pipeline, new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
});

if (Logger.IsTraceEnabled())
{
Logger.Trace("Watcher: Watching for enqueued jobs");
}

foreach (var change in cursor.ToEnumerable(cancellationToken))
{
var doc = change["fullDocument"].AsBsonDocument;
var types = doc["_t"].AsBsonArray;

if (!types.Contains("JobDto"))
{
continue;
}

var stateName = doc[nameof(JobDto.StateName)];
if (stateName == BsonNull.Value ||
stateName != EnqueuedState.StateName)
{
continue;
}

// var queue = change["updateDescription"]["updatedFields"][nameof(JobDto.Queue)].AsString;
JobQueueSemaphore.Release(doc[nameof(JobDto.Queue)].AsString);
if (Logger.IsTraceEnabled())
{
Logger.Trace("Watcher: Job enqueued, queue: " + doc[nameof(JobDto.Queue)].AsString);
}
}
}
catch (MongoCommandException e)
{
if (e.Message.Contains("$changeStream stage is only supported on replica sets"))
{
Logger.ErrorException(
"Current db does not support change stream (not a replica set, https://docs.mongodb.com/manual/reference/method/db.collection.watch/)\r\n" +
"if you need instant (almost) handling of enqueued jobs, please set 'CheckQueuedJobsStrategy' to 'TailNotificationsCollection' in MongoStorageOptions",
e);
throw;
}

// wait max allowed
cancellationToken.WaitHandle.WaitOne(MongoNotificationObserver.MaxTimeout);
}
}
}
}
50 changes: 48 additions & 2 deletions src/Hangfire.Mongo/CosmosDB/CosmosStorage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using Hangfire.Server;
using Hangfire.Storage;
using MongoDB.Driver;

namespace Hangfire.Mongo.CosmosDB
{
Expand All @@ -13,9 +18,50 @@ public class CosmosStorage : MongoStorage
/// <param name="mongoClient"></param>
/// <param name="databaseName"></param>
/// <param name="storageOptions"></param>
public CosmosStorage(IMongoClient mongoClient, string databaseName, CosmosStorageOptions storageOptions)
public CosmosStorage(IMongoClient mongoClient, string databaseName, CosmosStorageOptions storageOptions)
: base(mongoClient, databaseName, storageOptions)
{
if (storageOptions.CheckQueuedJobsStrategy == CheckQueuedJobsStrategy.TailNotificationsCollection)
{
throw new ArgumentException("CosmosDB does not support capped collections");
}
Features = new ReadOnlyDictionary<string, bool>(
new Dictionary<string, bool>(StringComparer.OrdinalIgnoreCase)
{
{JobStorageFeatures.ExtendedApi, true},
{JobStorageFeatures.JobQueueProperty, true},
{JobStorageFeatures.Connection.BatchedGetFirstByLowest, true},
{JobStorageFeatures.Connection.GetUtcDateTime, false},
{JobStorageFeatures.Connection.GetSetContains, true},
{JobStorageFeatures.Connection.LimitedGetSetCount, true},
{JobStorageFeatures.Transaction.AcquireDistributedLock, true},
{JobStorageFeatures.Transaction.CreateJob, true},
{JobStorageFeatures.Transaction.SetJobParameter, true},
{JobStorageFeatures.Monitoring.DeletedStateGraphs, true},
{JobStorageFeatures.Monitoring.AwaitingJobs, true}
});
}

/// <summary>
/// Returns collection of server components
/// </summary>
/// <returns>Collection of server components</returns>
public override IEnumerable<IServerComponent> GetComponents()
{
yield return StorageOptions.Factory.CreateMongoExpirationManager(HangfireDbContext, StorageOptions);
switch (StorageOptions.CheckQueuedJobsStrategy)
{
case CheckQueuedJobsStrategy.Watch:

yield return StorageOptions.Factory.CreateMongoJobQueueWatcher(HangfireDbContext, StorageOptions);
break;
case CheckQueuedJobsStrategy.Poll:
break;
case CheckQueuedJobsStrategy.TailNotificationsCollection:
throw new NotSupportedException("CosmosDB does not support capped collections");
default:
throw new ArgumentOutOfRangeException();
}
}
}
}
1 change: 1 addition & 0 deletions src/Hangfire.Mongo/CosmosDB/CosmosStorageOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public CosmosStorageOptions()
CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.Poll;
CheckConnection = false;
MigrationLockTimeout = TimeSpan.FromMinutes(2);
Factory = new CosmosFactory();
}
}
}
6 changes: 3 additions & 3 deletions src/Hangfire.Mongo/Hangfire.Mongo.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<VersionPrefix>1.9.9</VersionPrefix>
<VersionPrefix>1.9.10</VersionPrefix>
<TargetFramework>netstandard2.0</TargetFramework>
<NoWarn>$(NoWarn);CS0618</NoWarn>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
Expand All @@ -21,8 +21,8 @@
<owners>Sergey Zwezdin, Jonas Gottschau</owners>
<Description>MongoDB storage implementation for Hangfire (background job system for ASP.NET applications).</Description>
<PackageTags>Hangfire AspNet OWIN MongoDB CosmosDB Long-Running Background Fire-And-Forget Delayed Recurring Tasks Jobs Scheduler Threading Queues</PackageTags>
<PackageReleaseNotes>1.9.9
- Fix Jobs queued directly when queue is specified when scheduling #359
<PackageReleaseNotes>1.9.10
- Fix CosmosDB duplicate key error #362
</PackageReleaseNotes>
<PackageReadmeFile>README.md</PackageReadmeFile>
<!--<PackageLicenseUrl>https://raw.githubusercontent.com/sergun/Hangfire.Mongo/master/LICENSE</PackageLicenseUrl>-->
Expand Down
Loading

0 comments on commit 08be7d0

Please sign in to comment.