Skip to content

Commit

Permalink
Merge pull request #81 from nayato/no-breakout
Browse files Browse the repository at this point in the history
STEE: smarter idle waiting for scheduled tasks
  • Loading branch information
nayato committed Apr 6, 2016
2 parents 1986553 + ba716c6 commit ed028ed
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 8 deletions.
21 changes: 16 additions & 5 deletions src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
readonly MpscLinkedQueue<IRunnable> taskQueue = new MpscLinkedQueue<IRunnable>();
Thread thread;
volatile int executionState = ST_NOT_STARTED;
readonly TimeSpan breakoutInterval;
readonly PreciseTimeSpan preciseBreakoutInterval;
PreciseTimeSpan lastExecutionTime;
readonly ManualResetEventSlim emptyEvent = new ManualResetEventSlim();
Expand All @@ -41,7 +40,6 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
public SingleThreadEventExecutor(string threadName, TimeSpan breakoutInterval)
{
this.terminationCompletionSource = new TaskCompletionSource();
this.breakoutInterval = breakoutInterval;
this.preciseBreakoutInterval = PreciseTimeSpan.FromTimeSpan(breakoutInterval);
this.scheduler = new ExecutorTaskScheduler(this);
this.thread = new Thread(this.Loop)
Expand Down Expand Up @@ -400,10 +398,23 @@ IRunnable PollTask()
if (task == null)
{
this.emptyEvent.Reset();
if ((task = this.taskQueue.Dequeue()) == null // revisit queue as producer might have put a task in meanwhile
&& this.emptyEvent.Wait(this.breakoutInterval))
if ((task = this.taskQueue.Dequeue()) == null) // revisit queue as producer might have put a task in meanwhile
{
task = this.taskQueue.Dequeue();
IScheduledRunnable nextScheduledTask = this.ScheduledTaskQueue.Peek();
if (nextScheduledTask != null)
{
TimeSpan wakeUpTimeout = (nextScheduledTask.Deadline - PreciseTimeSpan.FromStart).ToTimeSpan();
if (this.emptyEvent.Wait(wakeUpTimeout))
{
// woken up before the next scheduled task was due
task = this.taskQueue.Dequeue();
}
}
else
{
this.emptyEvent.Wait();
task = this.taskQueue.Dequeue();
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/DotNetty.Common/DotNetty.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
<Compile Include="IResourceLeakHint.cs" />
<Compile Include="ThreadLocalObjectList.cs" />
<Compile Include="ThreadLocalPool.cs" />
<Compile Include="Timestamp.cs" />
<Compile Include="PreciseTimeSpan.cs" />
<Compile Include="Utilities\AtomicReference.cs" />
<Compile Include="Utilities\BitOps.cs" />
<Compile Include="Utilities\ByteArrayExtensions.cs" />
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/DotNetty.Transport/Channels/SingleThreadEventLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace DotNetty.Transport.Channels

public class SingleThreadEventLoop : SingleThreadEventExecutor, IEventLoop
{
static readonly TimeSpan DefaultBreakoutInterval = TimeSpan.FromSeconds(5);
static readonly TimeSpan DefaultBreakoutInterval = TimeSpan.FromMilliseconds(100);

public SingleThreadEventLoop()
: this(null, DefaultBreakoutInterval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,40 @@ public void FuzzyScheduling(int producerCount, bool perCpu, int taskPerProducer)
Assert.True(mre.WaitOne(TimeSpan.FromSeconds(5)));
}

public class Container<T>
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ScheduledTaskFiresOnTime(bool scheduleFromExecutor)
{
var scheduler = new SingleThreadEventExecutor(null, TimeSpan.FromMinutes(1));
var promise = new TaskCompletionSource();
Func<Task> scheduleFunc = () => scheduler.ScheduleAsync(() => promise.Complete(), TimeSpan.FromMilliseconds(100));
Task task = scheduleFromExecutor ? await scheduler.SubmitAsync(scheduleFunc) : scheduleFunc();
await Task.WhenAny(task, Task.Delay(TimeSpan.FromMilliseconds(300)));
Assert.True(task.IsCompleted);
}

[Fact]
public async Task ScheduledTaskFiresOnTimeWhileBusy()
{
var scheduler = new SingleThreadEventExecutor(null, TimeSpan.FromMilliseconds(10));
var promise = new TaskCompletionSource();
Action selfQueueAction = null;
selfQueueAction = () =>
{
if (!promise.Task.IsCompleted)
{
scheduler.Execute(selfQueueAction);
}
};

scheduler.Execute(selfQueueAction);
Task task = scheduler.ScheduleAsync(() => promise.Complete(), TimeSpan.FromMilliseconds(100));
await Task.WhenAny(task, Task.Delay(TimeSpan.FromMilliseconds(300)));
Assert.True(task.IsCompleted);
}

class Container<T>
{
public T Value;
}
Expand Down

0 comments on commit ed028ed

Please sign in to comment.