From ada8a3a97c28efa3fb911097f8a2589947c30ade Mon Sep 17 00:00:00 2001 From: cuteant Date: Sat, 29 Aug 2020 02:49:41 +0800 Subject: [PATCH] Fixes(#16) - Disable LazyExecute(SingleThreadEventLoop & LoopExecutor) --- .../Concurrency/SingleThreadEventExecutor.cs | 15 +++---- src/DotNetty.Transport.Libuv/LoopExecutor.cs | 18 ++++++++ .../Channels/SingleThreadEventLoop.cs | 43 ++++--------------- .../Channel/Local/LocalChannelTest.cs | 1 + 4 files changed, 33 insertions(+), 44 deletions(-) diff --git a/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs b/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs index b46baa91d..a2872b06a 100644 --- a/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs +++ b/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs @@ -1048,7 +1048,11 @@ public override void Execute(IRunnable task) } else { - InternalLazyExecute(task); + // netty 第一个任务进来,不管是否延迟任务,都会启动线程 + // 防止线程启动后,第一个进来的就是 lazy task + var firstTask = _firstTask; + if (firstTask) { _firstTask = false; } + InternalExecute(task, firstTask); } } @@ -1057,15 +1061,6 @@ public override void LazyExecute(IRunnable task) InternalExecute(task, false); } - protected virtual void InternalLazyExecute(IRunnable task) - { - // netty 第一个任务进来,不管是否延迟任务,都会启动线程 - // 防止线程启动后,第一个进来的就是 lazy task - var firstTask = _firstTask; - if (firstTask) { _firstTask = false; } - InternalExecute(task, firstTask); - } - [MethodImpl(InlineMethod.AggressiveOptimization)] private void InternalExecute(IRunnable task, bool immediate) { diff --git a/src/DotNetty.Transport.Libuv/LoopExecutor.cs b/src/DotNetty.Transport.Libuv/LoopExecutor.cs index e7a450cc9..9365c1586 100644 --- a/src/DotNetty.Transport.Libuv/LoopExecutor.cs +++ b/src/DotNetty.Transport.Libuv/LoopExecutor.cs @@ -207,6 +207,24 @@ protected sealed override IQueue NewTaskQueue(int maxPendingTasks) return new CompatibleConcurrentQueue(); } + public override void Execute(IRunnable task) + { + InternalExecute(task); + } + + public override void LazyExecute(IRunnable task) + { + InternalExecute(task); + } + + [MethodImpl(InlineMethod.AggressiveOptimization)] + private void InternalExecute(IRunnable task) + { + AddTask(task); + + WakeUp(InEventLoop); + } + protected override void WakeUp(bool inEventLoop) { // If the executor is not in the event loop, wake up the loop by async handle immediately. diff --git a/src/DotNetty.Transport/Channels/SingleThreadEventLoop.cs b/src/DotNetty.Transport/Channels/SingleThreadEventLoop.cs index 925d03bb0..a85f7e566 100644 --- a/src/DotNetty.Transport/Channels/SingleThreadEventLoop.cs +++ b/src/DotNetty.Transport/Channels/SingleThreadEventLoop.cs @@ -45,7 +45,6 @@ public class SingleThreadEventLoop : SingleThreadEventLoopBase private readonly long _breakoutNanosInterval; private readonly ManualResetEventSlim _emptyEvent; - private bool _firstTask; // 不需要设置 volatile public SingleThreadEventLoop(IEventLoopGroup parent) : this(parent, DefaultBreakoutInterval) @@ -85,7 +84,6 @@ public SingleThreadEventLoop(IEventLoopGroup parent, IThreadFactory threadFactor public SingleThreadEventLoop(IEventLoopGroup parent, IThreadFactory threadFactory, IRejectedExecutionHandler rejectedHandler, TimeSpan breakoutInterval) : base(parent, threadFactory, false, int.MaxValue, rejectedHandler) { - _firstTask = true; _emptyEvent = new ManualResetEventSlim(false, 1); _breakoutNanosInterval = PreciseTime.ToDelayNanos(breakoutInterval); Start(); @@ -115,40 +113,20 @@ protected override void Run() /// public sealed override void Execute(IRunnable task) { - if (!(task is ILazyRunnable) -#if DEBUG - && WakesUpForTask(task) -#endif - ) - { - InternalExecute(task, true); - } - else - { - InternalLazyExecute(task); - } + InternalExecute(task); } public sealed override void LazyExecute(IRunnable task) { - InternalExecute(task, false); - } - - protected override void InternalLazyExecute(IRunnable task) - { - // netty 第一个任务进来,不管是否延迟任务,都会启动线程 - // 防止线程启动后,第一个进来的就是 lazy task - var firstTask = _firstTask; - if (firstTask) { _firstTask = false; } - InternalExecute(task, firstTask); + InternalExecute(task); } [MethodImpl(InlineMethod.AggressiveOptimization)] - private void InternalExecute(IRunnable task, bool immediate) + private void InternalExecute(IRunnable task) { AddTask(task); - if (immediate) { _emptyEvent.Set(); } + if (!InEventLoop) { _emptyEvent.Set(); } } protected sealed override void WakeUp(bool inEventLoop) @@ -171,7 +149,10 @@ protected sealed override IRunnable PollTask() #else _emptyEvent.Reset(); #endif - if (_taskQueue.TryDequeue(out task) || IsShuttingDown) { return task; } + if (_taskQueue.TryDequeue(out task) || IsShuttingDown) // revisit queue as producer might have put a task in meanwhile + { + return task; + } // revisit queue as producer might have put a task in meanwhile if (ScheduledTaskQueue.TryPeek(out IScheduledRunnable nextScheduledTask)) @@ -180,14 +161,8 @@ protected sealed override IRunnable PollTask() if ((ulong)delayNanos > 0UL) // delayNanos >= 0 { var timeout = PreciseTime.ToMilliseconds(delayNanos); - if (_emptyEvent.Wait((int)Math.Min(timeout, MaxDelayMilliseconds))) - { - if (_taskQueue.TryDequeue(out task)) { return task; } - } + _emptyEvent.Wait((int)Math.Min(timeout, MaxDelayMilliseconds)); } - - FetchFromScheduledTaskQueue(); - _taskQueue.TryDequeue(out task); } else { diff --git a/test/DotNetty.Transport.Tests/Channel/Local/LocalChannelTest.cs b/test/DotNetty.Transport.Tests/Channel/Local/LocalChannelTest.cs index a6d6268fb..953e5d9aa 100644 --- a/test/DotNetty.Transport.Tests/Channel/Local/LocalChannelTest.cs +++ b/test/DotNetty.Transport.Tests/Channel/Local/LocalChannelTest.cs @@ -995,6 +995,7 @@ public TestHandler0(IPromise promise, IPromise assertPromise) public override void ChannelActive(IChannelHandlerContext ctx) { + Thread.Sleep(10); // Ensure the promise was done before the handler method is triggered. if (_promise.IsCompleted) {