Skip to content

Commit

Permalink
Fixes(#16) - Disable LazyExecute(SingleThreadEventLoop & LoopExecutor)
Browse files Browse the repository at this point in the history
  • Loading branch information
cuteant committed Aug 28, 2020
1 parent 78eeb31 commit ada8a3a
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 44 deletions.
15 changes: 5 additions & 10 deletions src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,11 @@ public override void Execute(IRunnable task)
}
else
{
InternalLazyExecute(task);
// netty 第一个任务进来,不管是否延迟任务,都会启动线程
// 防止线程启动后,第一个进来的就是 lazy task
var firstTask = _firstTask;
if (firstTask) { _firstTask = false; }
InternalExecute(task, firstTask);
}
}

Expand All @@ -1057,15 +1061,6 @@ public override void LazyExecute(IRunnable task)
InternalExecute(task, false);
}

protected virtual void InternalLazyExecute(IRunnable task)
{
// netty 第一个任务进来,不管是否延迟任务,都会启动线程
// 防止线程启动后,第一个进来的就是 lazy task
var firstTask = _firstTask;
if (firstTask) { _firstTask = false; }
InternalExecute(task, firstTask);
}

[MethodImpl(InlineMethod.AggressiveOptimization)]
private void InternalExecute(IRunnable task, bool immediate)
{
Expand Down
18 changes: 18 additions & 0 deletions src/DotNetty.Transport.Libuv/LoopExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,24 @@ protected sealed override IQueue<IRunnable> NewTaskQueue(int maxPendingTasks)
return new CompatibleConcurrentQueue<IRunnable>();
}

public override void Execute(IRunnable task)
{
InternalExecute(task);
}

public override void LazyExecute(IRunnable task)
{
InternalExecute(task);
}

[MethodImpl(InlineMethod.AggressiveOptimization)]
private void InternalExecute(IRunnable task)
{
AddTask(task);

WakeUp(InEventLoop);
}

protected override void WakeUp(bool inEventLoop)
{
// If the executor is not in the event loop, wake up the loop by async handle immediately.
Expand Down
43 changes: 9 additions & 34 deletions src/DotNetty.Transport/Channels/SingleThreadEventLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class SingleThreadEventLoop : SingleThreadEventLoopBase

private readonly long _breakoutNanosInterval;
private readonly ManualResetEventSlim _emptyEvent;
private bool _firstTask; // 不需要设置 volatile

public SingleThreadEventLoop(IEventLoopGroup parent)
: this(parent, DefaultBreakoutInterval)
Expand Down Expand Up @@ -85,7 +84,6 @@ public SingleThreadEventLoop(IEventLoopGroup parent, IThreadFactory threadFactor
public SingleThreadEventLoop(IEventLoopGroup parent, IThreadFactory threadFactory, IRejectedExecutionHandler rejectedHandler, TimeSpan breakoutInterval)
: base(parent, threadFactory, false, int.MaxValue, rejectedHandler)
{
_firstTask = true;
_emptyEvent = new ManualResetEventSlim(false, 1);
_breakoutNanosInterval = PreciseTime.ToDelayNanos(breakoutInterval);
Start();
Expand Down Expand Up @@ -115,40 +113,20 @@ protected override void Run()
/// <inheritdoc />
public sealed override void Execute(IRunnable task)
{
if (!(task is ILazyRunnable)
#if DEBUG
&& WakesUpForTask(task)
#endif
)
{
InternalExecute(task, true);
}
else
{
InternalLazyExecute(task);
}
InternalExecute(task);
}

public sealed override void LazyExecute(IRunnable task)
{
InternalExecute(task, false);
}

protected override void InternalLazyExecute(IRunnable task)
{
// netty 第一个任务进来,不管是否延迟任务,都会启动线程
// 防止线程启动后,第一个进来的就是 lazy task
var firstTask = _firstTask;
if (firstTask) { _firstTask = false; }
InternalExecute(task, firstTask);
InternalExecute(task);
}

[MethodImpl(InlineMethod.AggressiveOptimization)]
private void InternalExecute(IRunnable task, bool immediate)
private void InternalExecute(IRunnable task)
{
AddTask(task);

if (immediate) { _emptyEvent.Set(); }
if (!InEventLoop) { _emptyEvent.Set(); }
}

protected sealed override void WakeUp(bool inEventLoop)
Expand All @@ -171,7 +149,10 @@ protected sealed override IRunnable PollTask()
#else
_emptyEvent.Reset();
#endif
if (_taskQueue.TryDequeue(out task) || IsShuttingDown) { return task; }
if (_taskQueue.TryDequeue(out task) || IsShuttingDown) // revisit queue as producer might have put a task in meanwhile
{
return task;
}

// revisit queue as producer might have put a task in meanwhile
if (ScheduledTaskQueue.TryPeek(out IScheduledRunnable nextScheduledTask))
Expand All @@ -180,14 +161,8 @@ protected sealed override IRunnable PollTask()
if ((ulong)delayNanos > 0UL) // delayNanos >= 0
{
var timeout = PreciseTime.ToMilliseconds(delayNanos);
if (_emptyEvent.Wait((int)Math.Min(timeout, MaxDelayMilliseconds)))
{
if (_taskQueue.TryDequeue(out task)) { return task; }
}
_emptyEvent.Wait((int)Math.Min(timeout, MaxDelayMilliseconds));
}

FetchFromScheduledTaskQueue();
_taskQueue.TryDequeue(out task);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ public TestHandler0(IPromise promise, IPromise assertPromise)

public override void ChannelActive(IChannelHandlerContext ctx)
{
Thread.Sleep(10);
// Ensure the promise was done before the handler method is triggered.
if (_promise.IsCompleted)
{
Expand Down

0 comments on commit ada8a3a

Please sign in to comment.