Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion RabbitMQDotNetClient.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,6 @@
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAlwaysTreatStructAsNotReorderableMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsCodeFormatterSettingsUpgrader/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsCodeFormatterSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsParsFormattingSettingsUpgrader/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EJavaScript_002ECodeStyle_002ESettingsUpgrade_002EJsWrapperSettingsUpgrader/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;
Expand Down Expand Up @@ -51,19 +50,15 @@ public async Task Stop()

class WorkPool
{
readonly ConcurrentQueue<Work> workQueue;
readonly TimeSpan waitTime;
readonly BlockingCollection<Work> workQueue;
readonly CancellationTokenSource tokenSource;
readonly ModelBase model;
TaskCompletionSource<bool> messageArrived;
private Task task;

public WorkPool(ModelBase model)
{
this.model = model;
workQueue = new ConcurrentQueue<Work>();
messageArrived = new TaskCompletionSource<bool>();
waitTime = TimeSpan.FromMilliseconds(100);
workQueue = new BlockingCollection<Work>();
tokenSource = new CancellationTokenSource();
}

Expand All @@ -74,23 +69,14 @@ public void Start()

public void Enqueue(Work work)
{
workQueue.Enqueue(work);
messageArrived.TrySetResult(true);
workQueue.Add(work);
}

async Task Loop()
{
while (tokenSource.IsCancellationRequested == false)
foreach (var work in workQueue.GetConsumingEnumerable(tokenSource.Token))
{
Work work;
while (workQueue.TryDequeue(out work))
{
await work.Execute(model).ConfigureAwait(false);
}

await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false);
messageArrived.TrySetResult(true);
messageArrived = new TaskCompletionSource<bool>();
await work.Execute(model).ConfigureAwait(false);
}
}

Expand Down