Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Hangfire.PostgreSql.sln
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{766BE831-F758-46BC-AFD3-BBEEFE0F686F}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{5CA38188-92EE-453C-A04E-A506DF15A787}"
ProjectSection(SolutionItems) = preProject
README.md = README.md
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{0D30A51B-814F-474E-93B8-44E9C155255C}"
EndProject
Expand Down
99 changes: 99 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,105 @@ app.UseHangfireServer(options);

this provider would first process jobs in `a-long-running-queue`, then `general-queue` and lastly `very-fast-queue`.

### Startup resilience and transient database outages

Starting from version 1.20.13 (where `PostgreSqlStorageOptions` gained startup resilience options), the storage tries to be more tolerant to *transient* PostgreSQL outages during application startup.

#### Default behavior

By default, when you use the new-style configuration:

```csharp
services.AddHangfire((provider, config) =>
{
config.UsePostgreSqlStorage(opts =>
opts.UseNpgsqlConnection(Configuration.GetConnectionString("HangfireConnection")));
});

app.UseHangfireServer();
app.UseHangfireDashboard();
```

`PostgreSqlStorageOptions` uses the following defaults for startup resilience:

- `PrepareSchemaIfNecessary = true`
- `StartupConnectionMaxRetries = 5`
- `StartupConnectionBaseDelay = 1 second`
- `StartupConnectionMaxDelay = 1 minute`
- `AllowDegradedModeWithoutStorage = true`

With these defaults:

1. On application startup, when schema preparation is required, the storage will try to open a connection and install/upgrade the schema.
2. If the database is temporarily unavailable, it will retry the operation up to 6 times (1 initial attempt + 5 retries) with exponential backoff between attempts, capped at 1 minute.
3. If all attempts fail **during startup**, the storage enters a *degraded* state instead of crashing the whole process. Your ASP.NET Core application can still start and serve other endpoints that do not depend on Hangfire.
4. On the *first actual use* of the storage (e.g. dashboard, background job server), Hangfire will try to initialize again. If the database is available by then, initialization succeeds and everything works as usual. If it is still unavailable, an `InvalidOperationException` with the original database exception as `InnerException` is thrown at that call site.

This behavior is designed to make applications more robust in scenarios where the database may briefly lag behind the application during deployments or orchestrated restarts.

#### Opting out of resilient startup (fail fast)

If you prefer to fail the whole process immediately if the database is not reachable during startup – you can disable retries by setting `StartupConnectionMaxRetries` to `0`:

```csharp
var storageOptions = new PostgreSqlStorageOptions
{
PrepareSchemaIfNecessary = true,
StartupConnectionMaxRetries = 0, // disables resilient startup
AllowDegradedModeWithoutStorage = false, // fail fast if DB is down at startup
};

services.AddHangfire((provider, config) =>
{
config.UsePostgreSqlStorage(opts =>
opts.UseNpgsqlConnection(Configuration.GetConnectionString("HangfireConnection")),
storageOptions);
});
```

With this configuration:

- A single attempt is made to open a connection and prepare the schema.
- If that attempt fails, the storage constructor throws and application startup fails.

#### Controlling degraded mode

Degraded mode is controlled via `AllowDegradedModeWithoutStorage`:

- `true` (default): if all startup attempts fail, both startup and lazy initialization will keep the storage in an uninitialized state on failure and retry on subsequent uses, until initialization eventually succeeds.
- `false`: if all startup attempts fail, the storage constructor will throw an `InvalidOperationException("Failed to initialize Hangfire PostgreSQL storage.", innerException)`.

For example, to keep retries but still fail startup if the DB never becomes available:

```csharp
var storageOptions = new PostgreSqlStorageOptions
{
PrepareSchemaIfNecessary = true,
StartupConnectionMaxRetries = 10, // more aggressive retry policy
StartupConnectionBaseDelay = TimeSpan.FromSeconds(2),
StartupConnectionMaxDelay = TimeSpan.FromMinutes(2),
AllowDegradedModeWithoutStorage = false, // do not start the app without storage
};
```

#### Turning off schema preparation entirely

If you manage the Hangfire schema yourself (for example via migrations or a dedicated deployment step) and do not want the storage to touch the database during startup or on first use, set `PrepareSchemaIfNecessary = false`:

```csharp
var storageOptions = new PostgreSqlStorageOptions
{
PrepareSchemaIfNecessary = false, // no schema installation/upgrade
};
```

In this case:

- No schema initialization is performed by `PostgreSqlStorage`.
- The first query that actually needs the database will fail if the schema is missing or mismatched, so you must ensure it is created/updated out of band.

> Note: startup resilience settings (`StartupConnectionMaxRetries`, `AllowDegradedModeWithoutStorage`, etc.) only apply when `PrepareSchemaIfNecessary` is `true`.

### License

Copyright © 2014-2024 Frank Hommers https://github.com/hangfire-postgres/Hangfire.PostgreSql.
Expand Down
161 changes: 144 additions & 17 deletions src/Hangfire.PostgreSql/PostgreSqlStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
using System.Data;
using System.Data.Common;
using System.Text;
using System.Threading;
using System.Transactions;
using Hangfire.Annotations;
using Hangfire.Logging;
Expand All @@ -39,7 +40,10 @@ namespace Hangfire.PostgreSql
public class PostgreSqlStorage : JobStorage
{
private readonly IConnectionFactory _connectionFactory;

private readonly object _initializationLock = new();
private bool _initialized;
private Exception _lastInitializationException;

private readonly Dictionary<string, bool> _features =
new(StringComparer.OrdinalIgnoreCase)
{
Expand Down Expand Up @@ -85,27 +89,18 @@ public PostgreSqlStorage(IConnectionFactory connectionFactory, PostgreSqlStorage
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
Options = options ?? throw new ArgumentNullException(nameof(options));

if (options.PrepareSchemaIfNecessary)
{
NpgsqlConnection connection = CreateAndOpenConnection();
try
{
PostgreSqlObjectsInstaller.Install(connection, options.SchemaName);
}
finally
{
if (connectionFactory is not ExistingNpgsqlConnectionFactory)
{
connection.Dispose();
}
}
}

InitializeQueueProviders();
if (Options.UseSlidingInvisibilityTimeout)
{
HeartbeatProcess = new PostgreSqlHeartbeatProcess();
}

// Perform eager initialization if schema preparation is requested. This can be made resilient
// via the options exposed on PostgreSqlStorageOptions.
if (Options.PrepareSchemaIfNecessary)
{
TryInitializeStorage(isStartup: true);
}
}

public PersistentJobQueueProviderCollection QueueProviders { get; internal set; }
Expand All @@ -116,11 +111,13 @@ public PostgreSqlStorage(IConnectionFactory connectionFactory, PostgreSqlStorage

public override IMonitoringApi GetMonitoringApi()
{
EnsureInitialized();
return new PostgreSqlMonitoringApi(this, QueueProviders);
}

public override IStorageConnection GetConnection()
{
EnsureInitialized();
return new PostgreSqlConnection(this);
}

Expand Down Expand Up @@ -199,6 +196,136 @@ internal NpgsqlConnection CreateAndOpenConnection()
}
}

/// <summary>
/// Ensures storage is initialized. When resilient startup and degraded mode are enabled,
/// this will attempt a lazy initialization on first use.
/// </summary>
private void EnsureInitialized()
{
if (_initialized || !Options.PrepareSchemaIfNecessary)
{
return;
}

lock (_initializationLock)
{
if (_initialized)
{
return;
}

TryInitializeStorage(isStartup: false);

if (!_initialized && !Options.AllowDegradedModeWithoutStorage)
{
// Initialization failed and degraded mode is not enabled - rethrow with the last error
// to give a clear signal to the caller.
throw new InvalidOperationException(
"Hangfire PostgreSQL storage is not initialized. See inner exception for details.",
_lastInitializationException);
}
}
}

private void TryInitializeStorage(bool isStartup)
{
// Fast-path: no resilient startup configured - keep the current behavior of a single attempt.
if (!Options.EnableResilientStartup)
{
PerformSingleInitializationAttempt();
_initialized = true;
_lastInitializationException = null;
return;
}

int attempts = 0;
int maxAttempts = 1 + Options.StartupConnectionMaxRetries; // initial + retries
Exception lastException = null;

while (attempts < maxAttempts)
{
try
{
PerformSingleInitializationAttempt();
_initialized = true;
_lastInitializationException = null;
return;
}
catch (Exception ex)
{
lastException = ex;
attempts++;

if (attempts >= maxAttempts)
{
break;
}

// Apply exponential backoff with capping.
TimeSpan delay = ComputeBackoffDelay(attempts, Options.StartupConnectionBaseDelay, Options.StartupConnectionMaxDelay);

try
{
Thread.Sleep(delay);
}
catch (ThreadInterruptedException)
{
// Preserve original exception and abort initialization.
break;
}
}
}

_initialized = false;
_lastInitializationException = lastException;

if (!Options.AllowDegradedModeWithoutStorage && isStartup)
{
// During startup without degraded mode, fail fast to avoid starting the app in
// a partially configured state.
throw new InvalidOperationException(
"Failed to initialize Hangfire PostgreSQL storage.",
lastException);
}

// When degraded mode is allowed, we swallow the exception here and leave storage
// uninitialized. Subsequent calls will attempt to initialize lazily via EnsureInitialized.
}

private void PerformSingleInitializationAttempt()
{
NpgsqlConnection connection = CreateAndOpenConnection();
try
{
PostgreSqlObjectsInstaller.Install(connection, Options.SchemaName);
}
finally
{
if (_connectionFactory is not ExistingNpgsqlConnectionFactory)
{
connection.Dispose();
}
}
}

private static TimeSpan ComputeBackoffDelay(int attempt, TimeSpan baseDelay, TimeSpan maxDelay)
{
if (attempt <= 0)
{
return baseDelay;
}

double factor = Math.Pow(2, attempt - 1);
double millis = baseDelay.TotalMilliseconds * factor;

if (millis < 0 || millis > maxDelay.TotalMilliseconds || double.IsInfinity(millis) || double.IsNaN(millis))
{
millis = maxDelay.TotalMilliseconds;
}

return TimeSpan.FromMilliseconds(millis);
}

internal void UseTransaction(DbConnection dedicatedConnection,
[InstantHandle] Action<DbConnection, IDbTransaction> action,
IsolationLevel? isolationLevel = null)
Expand Down
36 changes: 36 additions & 0 deletions src/Hangfire.PostgreSql/PostgreSqlStorageOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public PostgreSqlStorageOptions()
EnableTransactionScopeEnlistment = true;
DeleteExpiredBatchSize = 1000;
UseSlidingInvisibilityTimeout = false;
StartupConnectionMaxRetries = 5;
StartupConnectionBaseDelay = TimeSpan.FromSeconds(1);
StartupConnectionMaxDelay = TimeSpan.FromMinutes(1);
AllowDegradedModeWithoutStorage = true;
}

public TimeSpan QueuePollInterval
Expand Down Expand Up @@ -134,6 +138,38 @@ public int DeleteExpiredBatchSize
/// </summary>
public bool UseSlidingInvisibilityTimeout { get; set; }

/// <summary>
/// Gets if additional resilience during storage initialization is enabled. When <see cref="StartupConnectionMaxRetries"/>
/// is greater than zero and <see cref="PrepareSchemaIfNecessary"/> is true, Hangfire will retry opening a
/// connection and installing schema instead of failing immediately.
/// This property is computed from <see cref="StartupConnectionMaxRetries"/>.
/// </summary>
public bool EnableResilientStartup => StartupConnectionMaxRetries > 0;

/// <summary>
/// Maximum number of additional attempts (after the initial one) to obtain a connection and
/// prepare the schema during startup when <see cref="EnableResilientStartup"/> is true.
/// Value of 0 keeps current behavior (no retries).
/// </summary>
public int StartupConnectionMaxRetries { get; set; }

/// <summary>
/// Base delay used to compute exponential backoff between startup connection attempts when <see cref="EnableResilientStartup"/> is true.
/// </summary>
public TimeSpan StartupConnectionBaseDelay { get; set; }

/// <summary>
/// Maximum delay between startup connection attempts when <see cref="EnableResilientStartup"/> is true.
/// </summary>
public TimeSpan StartupConnectionMaxDelay { get; set; }

/// <summary>
/// When true and <see cref="EnableResilientStartup"/> is enabled, storage initialization will
/// not throw even if all startup connection attempts fail. Instead, the storage starts in a
/// degraded mode and will attempt to initialize lazily on first use.
/// </summary>
public bool AllowDegradedModeWithoutStorage { get; set; }

private static void ThrowIfValueIsNotPositive(TimeSpan value, string fieldName)
{
string message = $"The {fieldName} property value should be positive. Given: {value}.";
Expand Down
Loading
Loading