Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for storage which does not support capped collections #366

Merged
merged 2 commits into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/Hangfire.Mongo.Tests/MongoStorageFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ public void Ctor_ThrowsAnException_WhenStorageOptionsValueIsNull()

Assert.Equal("storageOptions", exception.ParamName);
}

[Fact]
public void Ctor_DoesNotSupportCappedAndTailNotificationChosen_ThrowsAnException()
{
var exception = Assert.Throws<NotSupportedException>(() => new MongoStorage(
MongoClientSettings.FromConnectionString("mongodb://localhost"),
"test",
new MongoStorageOptions
{
CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.TailNotificationsCollection,
SupportsCappedCollection = false
}));

Assert.Contains("CheckQueuedJobsStrategy, cannot be TailNotificationsCollection if", exception.Message);
}

[Fact]
public void GetMonitoringApi_ReturnsNonNullInstance()
Expand Down
1 change: 1 addition & 0 deletions src/Hangfire.Mongo/CosmosDB/CosmosStorageOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public CosmosStorageOptions()
CheckConnection = false;
MigrationLockTimeout = TimeSpan.FromMinutes(2);
Factory = new CosmosFactory();
SupportsCappedCollection = false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal class CreateSignalCollection : IMongoMigrationStep

public bool Execute(IMongoDatabase database, MongoStorageOptions storageOptions, IMongoMigrationContext migrationContext)
{
if (storageOptions is CosmosStorageOptions)
if (!storageOptions.SupportsCappedCollection)
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ internal class AddNotificationsCollection : IMongoMigrationStep
public bool Execute(IMongoDatabase database, MongoStorageOptions storageOptions,
IMongoMigrationContext migrationContext)
{
if (storageOptions is CosmosStorageOptions) return true;
if (!storageOptions.SupportsCappedCollection)
{
return true;
}

database.CreateCollection(storageOptions.Prefix + ".notifications", new CreateCollectionOptions
{
Expand Down
67 changes: 42 additions & 25 deletions src/Hangfire.Mongo/MongoStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class MongoStorage : JobStorage
/// Storage options
/// </summary>
protected readonly MongoStorageOptions StorageOptions;

/// <summary>
/// DB context
/// </summary>
Expand All @@ -42,19 +42,20 @@ public class MongoStorage : JobStorage
/// <summary>
/// Enabled Hangfire features. To change enabled features, inherit this class and override 'HasFeature' method
/// </summary>
public ReadOnlyDictionary<string, bool> Features { get; protected set; } = new ReadOnlyDictionary<string, bool>(new Dictionary<string, bool>(StringComparer.OrdinalIgnoreCase)
public ReadOnlyDictionary<string, bool> Features { get; protected set; } = new ReadOnlyDictionary<string, bool>(
new Dictionary<string, bool>(StringComparer.OrdinalIgnoreCase)
{
{ JobStorageFeatures.ExtendedApi, true },
{ JobStorageFeatures.JobQueueProperty, true },
{ JobStorageFeatures.Connection.BatchedGetFirstByLowest, true },
{ JobStorageFeatures.Connection.GetUtcDateTime, true },
{ JobStorageFeatures.Connection.GetSetContains, true },
{ JobStorageFeatures.Connection.LimitedGetSetCount, true },
{ JobStorageFeatures.Transaction.AcquireDistributedLock, true },
{ JobStorageFeatures.Transaction.CreateJob, true },
{ JobStorageFeatures.Transaction.SetJobParameter, true },
{ JobStorageFeatures.Monitoring.DeletedStateGraphs, true },
{ JobStorageFeatures.Monitoring.AwaitingJobs, true }
{JobStorageFeatures.ExtendedApi, true},
{JobStorageFeatures.JobQueueProperty, true},
{JobStorageFeatures.Connection.BatchedGetFirstByLowest, true},
{JobStorageFeatures.Connection.GetUtcDateTime, true},
{JobStorageFeatures.Connection.GetSetContains, true},
{JobStorageFeatures.Connection.LimitedGetSetCount, true},
{JobStorageFeatures.Transaction.AcquireDistributedLock, true},
{JobStorageFeatures.Transaction.CreateJob, true},
{JobStorageFeatures.Transaction.SetJobParameter, true},
{JobStorageFeatures.Monitoring.DeletedStateGraphs, true},
{JobStorageFeatures.Monitoring.AwaitingJobs, true}
});

/// <summary>
Expand All @@ -73,10 +74,10 @@ public MongoStorage(MongoClientSettings mongoClientSettings, string databaseName
/// <param name="mongoClientSettings">Client settings for MongoDB</param>
/// <param name="databaseName">Database name</param>
/// <param name="storageOptions">Storage options</param>
public MongoStorage(MongoClientSettings mongoClientSettings, string databaseName, MongoStorageOptions storageOptions)
public MongoStorage(MongoClientSettings mongoClientSettings, string databaseName,
MongoStorageOptions storageOptions)
: this(new MongoClient(mongoClientSettings), databaseName, storageOptions)
{

}

/// <summary>
Expand All @@ -92,10 +93,22 @@ public MongoStorage(IMongoClient mongoClient, string databaseName, MongoStorageO
{
throw new ArgumentNullException(nameof(databaseName));
}
StorageOptions = storageOptions ?? throw new ArgumentNullException(nameof(storageOptions));

if (storageOptions.CheckQueuedJobsStrategy == CheckQueuedJobsStrategy.TailNotificationsCollection &&
storageOptions.SupportsCappedCollection == false)
{
throw new NotSupportedException(
$"{nameof(MongoStorageOptions.CheckQueuedJobsStrategy)}, cannot be {CheckQueuedJobsStrategy.TailNotificationsCollection}" +
$" if {nameof(MongoStorageOptions.SupportsCappedCollection)} is false"
);
}

DatabaseName = databaseName;
MongoClient = mongoClient ?? throw new ArgumentNullException(nameof(mongoClient));
StorageOptions = storageOptions ?? throw new ArgumentNullException(nameof(storageOptions));
HangfireDbContext = StorageOptions.Factory.CreateDbContext(mongoClient, databaseName, storageOptions.Prefix);

HangfireDbContext =
StorageOptions.Factory.CreateDbContext(mongoClient, databaseName, storageOptions.Prefix);

if (StorageOptions.CheckConnection)
{
Expand All @@ -107,24 +120,25 @@ public MongoStorage(IMongoClient mongoClient, string databaseName, MongoStorageO
MongoMigrationManager.MigrateIfNeeded(storageOptions, HangfireDbContext.Database);
}
}

private void CheckConnection()
{
using (var cts = new CancellationTokenSource(StorageOptions.ConnectionCheckTimeout))
{
try
{
HangfireDbContext.Database.RunCommand((Command<BsonDocument>)"{ping:1}", cancellationToken: cts.Token);
HangfireDbContext.Database.RunCommand((Command<BsonDocument>) "{ping:1}",
cancellationToken: cts.Token);
}
catch (Exception e)
{
throw new MongoConnectException(HangfireDbContext, CreateObscuredConnectionString(), StorageOptions.ConnectionCheckTimeout, e);
throw new MongoConnectException(HangfireDbContext, CreateObscuredConnectionString(),
StorageOptions.ConnectionCheckTimeout, e);
}
}
}

/// <inheritdoc/>

public override bool HasFeature([NotNull] string featureId)
{
if (featureId == null) throw new ArgumentNullException(nameof(featureId));
Expand All @@ -135,7 +149,6 @@ public override bool HasFeature([NotNull] string featureId)
}



/// <summary>
/// Returns Monitoring API object
/// </summary>
Expand Down Expand Up @@ -167,7 +180,11 @@ public override IEnumerable<IServerComponent> GetComponents()
yield return StorageOptions.Factory.CreateMongoJobQueueWatcher(HangfireDbContext, StorageOptions);
break;
case CheckQueuedJobsStrategy.TailNotificationsCollection:
yield return StorageOptions.Factory.CreateMongoNotificationObserver(HangfireDbContext, StorageOptions);
if (StorageOptions.SupportsCappedCollection)
{
yield return StorageOptions.Factory.CreateMongoNotificationObserver(HangfireDbContext,
StorageOptions);
}
break;
}
}
Expand All @@ -187,8 +204,8 @@ public override void WriteOptionsToLog(ILog logger)
/// </summary>
public override string ToString()
{

return $"Connection string: {CreateObscuredConnectionString()}, database name: {DatabaseName}, prefix: {StorageOptions.Prefix}";
return
$"Connection string: {CreateObscuredConnectionString()}, database name: {DatabaseName}, prefix: {StorageOptions.Prefix}";
}

private string CreateObscuredConnectionString()
Expand Down
8 changes: 8 additions & 0 deletions src/Hangfire.Mongo/MongoStorageOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ public TimeSpan QueuePollInterval
_queuePollInterval = value;
}
}

/// <summary>
/// Indicates if the underlying storage supports capped collections
/// default set to true.
/// MongoStorage will throw NotSupportedException if 'SupportsCappedCollection' is false
/// and 'CheckQueuedJobsStrategy' is TailNotificationsCollection
/// </summary>
public bool SupportsCappedCollection { get; set; } = true;

/// <summary>
/// Invisibility timeout
Expand Down
Loading