Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing Enqueued Job Trigger for Multiple Queues #320

Merged
18 changes: 8 additions & 10 deletions src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class PostgreSqlJobQueue : IPersistentJobQueue
{
private const string JobNotificationChannel = "new_job";

internal static readonly AutoResetEvent _newItemInQueueEvent = new(true);
internal static readonly AutoResetEventRegistry _newItemInEvents = new();
azygis marked this conversation as resolved.
Show resolved Hide resolved
private readonly PostgreSqlStorage _storage;

public PostgreSqlJobQueue(PostgreSqlStorage storage)
Expand Down Expand Up @@ -147,6 +147,12 @@ LIMIT 1
RETURNING ""id"" AS ""Id"", ""jobid"" AS ""JobId"", ""queue"" AS ""Queue"", ""fetchedat"" AS ""FetchedAt"";
";

WaitHandle[] nextFetchIterationWaitHandles = new[] {
cancellationToken.WaitHandle,
SignalDequeue,
JobQueueNotification,
}.Concat(_newItemInEvents.GetWaitHandles(queues)).ToArray();

do
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -181,15 +187,7 @@ LIMIT 1

if (fetchedJob == null)
{
WaitHandle.WaitAny(new[] {
cancellationToken.WaitHandle,
_newItemInQueueEvent,
SignalDequeue,
JobQueueNotification,
},
_storage.Options.QueuePollInterval);

cancellationToken.ThrowIfCancellationRequested();
WaitHandle.WaitAny(nextFetchIterationWaitHandles, _storage.Options.QueuePollInterval);
}
}
while (fetchedJob == null);
Expand Down
10 changes: 0 additions & 10 deletions src/Hangfire.PostgreSql/PostgreSqlStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -320,22 +320,12 @@ internal T UseTransaction<T>(DbConnection dedicatedConnection, Func<DbConnection

T result = func(connection, null);

// TransactionCompleted event is required here, because if this TransactionScope is enlisted within an ambient TransactionScope, the ambient TransactionScope controls when the TransactionScope completes.
Transaction.Current.TransactionCompleted += Current_TransactionCompleted;
transaction.Complete();

return result;
});
}

private static void Current_TransactionCompleted(object sender, TransactionEventArgs e)
{
if (e.Transaction.TransactionInformation.Status == TransactionStatus.Committed)
{
PostgreSqlJobQueue._newItemInQueueEvent.Set();
}
}

internal TransactionScope CreateTransactionScope(IsolationLevel? isolationLevel, TimeSpan? timeout = null)
{
return TransactionHelpers.CreateTransactionScope(isolationLevel, Options.EnableTransactionScopeEnlistment, timeout);
Expand Down
7 changes: 7 additions & 0 deletions src/Hangfire.PostgreSql/PostgreSqlWriteOnlyTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class PostgreSqlWriteOnlyTransaction : JobStorageTransaction
{
private readonly Queue<Action<IDbConnection>> _commandQueue = new();
private readonly Func<DbConnection> _dedicatedConnectionFunc;
private readonly List<string> _queuesWithAddedJobs = new();
azygis marked this conversation as resolved.
Show resolved Hide resolved

private readonly PostgreSqlStorage _storage;

Expand All @@ -56,6 +57,10 @@ public override void Commit()
command(connection);
}
}, CreateTransactionScope);

// Triggers signals for all queues to which jobs have been added in this transaction
_queuesWithAddedJobs.ForEach(PostgreSqlJobQueue._newItemInEvents.Set);
_queuesWithAddedJobs.Clear();
}

private TransactionScope CreateTransactionScope()
Expand Down Expand Up @@ -134,6 +139,8 @@ public override void AddToQueue(string queue, string jobId)
IPersistentJobQueue persistentQueue = provider.GetJobQueue();

QueueCommand(con => persistentQueue.Enqueue(con, queue, jobId));

_queuesWithAddedJobs.Add(queue);
}

public override void IncrementCounter(string key)
Expand Down
40 changes: 40 additions & 0 deletions src/Hangfire.PostgreSql/Utils/AutoResetEventRegistry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;

namespace Hangfire.PostgreSql.Utils
{
/// <summary>
/// Represents a registry for managing AutoResetEvent instances using event keys.
/// </summary>
public class AutoResetEventRegistry
{
private readonly ConcurrentDictionary<string, AutoResetEvent> _events = new();

/// <summary>
/// Retrieves the wait handles associated with the specified event keys.
/// </summary>
/// <param name="eventKeys">The event keys.</param>
/// <returns>An enumerable of wait handles.</returns>
public IEnumerable<WaitHandle> GetWaitHandles(IEnumerable<string> eventKeys)
{
foreach (string eventKey in eventKeys)
{
AutoResetEvent newHandle = _events.GetOrAdd(eventKey, _ => new AutoResetEvent(false));
yield return newHandle;
}
}

/// <summary>
/// Sets the specified event.
/// </summary>
/// <param name="eventKey">The event key.</param>
public void Set(string eventKey)
{
if (_events.TryGetValue(eventKey, out AutoResetEvent handle))
{
handle.Set();
}
}
}
}