From aa3cbc67662ea831f59025565030ac9b3ce69bfe Mon Sep 17 00:00:00 2001 From: Max Gortman Date: Wed, 27 Apr 2016 00:29:56 -0700 Subject: [PATCH] Fixes ByteBuffer+Stream integration, TLS neg, STEE Shutdown Motivation: Fix up top priority issues to ensure proper working state with recent changes. Modifications: - TlsHandler negotiates TLS 1.0+ on server side (#89) - STEE properly supports graceful shutdown (#7) - UnpooledHeapByteBuffer.GetBytes honors received index and length (#88) - Echo E2E test uses length-prefix based framing (#90) Result: Setting up DotNetty does not require workarounds for shutdown and hacks to enable negotiation of higher versions of TLS. Tests are passing even with new SslStream behavior. --- examples/Echo.Client/Program.cs | 7 ++- examples/Echo.Server/Program.cs | 5 +- src/DotNetty.Buffers/ByteBufferUtil.cs | 3 +- .../UnpooledHeapByteBuffer.cs | 2 +- .../Concurrency/SingleThreadEventExecutor.cs | 62 +++++++------------ src/DotNetty.Handlers/Tls/TlsHandler.cs | 2 +- .../SingleThreadEventExecutorTests.cs | 15 +++++ .../DotNetty.Tests.End2End.csproj | 1 + test/DotNetty.Tests.End2End/End2EndTests.cs | 14 +++-- 9 files changed, 62 insertions(+), 49 deletions(-) diff --git a/examples/Echo.Client/Program.cs b/examples/Echo.Client/Program.cs index b361d22af..266625369 100644 --- a/examples/Echo.Client/Program.cs +++ b/examples/Echo.Client/Program.cs @@ -8,6 +8,7 @@ namespace Echo.Client using System.Net; using System.Security.Cryptography.X509Certificates; using System.Threading.Tasks; + using DotNetty.Codecs; using DotNetty.Common.Internal.Logging; using DotNetty.Handlers.Tls; using DotNetty.Transport.Bootstrapping; @@ -42,10 +43,12 @@ static async Task RunClient() string targetHost = cert.GetNameInfo(X509NameType.DnsName, false); pipeline.AddLast(TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true)); } + pipeline.AddLast(new LengthFieldPrepender(2)); + pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); pipeline.AddLast(new EchoClientHandler()); })); - + IChannel bootstrapChannel = await bootstrap.ConnectAsync(new IPEndPoint(EchoClientSettings.Host, EchoClientSettings.Port)); Console.ReadLine(); @@ -64,4 +67,4 @@ static void Main(string[] args) Task.Run(() => RunClient()).Wait(); } } -} +} \ No newline at end of file diff --git a/examples/Echo.Server/Program.cs b/examples/Echo.Server/Program.cs index 7a86284fd..49b7e21a2 100644 --- a/examples/Echo.Server/Program.cs +++ b/examples/Echo.Server/Program.cs @@ -7,6 +7,7 @@ namespace Echo.Server using System.Diagnostics.Tracing; using System.Security.Cryptography.X509Certificates; using System.Threading.Tasks; + using DotNetty.Codecs; using DotNetty.Common.Internal.Logging; using DotNetty.Handlers.Logging; using DotNetty.Handlers.Tls; @@ -41,6 +42,8 @@ static async Task RunServer() { pipeline.AddLast(TlsHandler.Server(new X509Certificate2("dotnetty.com.pfx", "password"))); } + pipeline.AddLast(new LengthFieldPrepender(2)); + pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); pipeline.AddLast(new EchoServerHandler()); })); @@ -63,4 +66,4 @@ static void Main(string[] args) Task.Run(() => RunServer()).Wait(); } } -} +} \ No newline at end of file diff --git a/src/DotNetty.Buffers/ByteBufferUtil.cs b/src/DotNetty.Buffers/ByteBufferUtil.cs index f70d6babc..d9038a77e 100644 --- a/src/DotNetty.Buffers/ByteBufferUtil.cs +++ b/src/DotNetty.Buffers/ByteBufferUtil.cs @@ -635,10 +635,11 @@ public static string DecodeString(IByteBuffer src, int readerIndex, int len, Enc else { IByteBuffer buffer = src.Allocator.Buffer(len); + Contract.Assert(buffer.HasArray, "Operation expects allocator to operate array-based buffers."); try { buffer.WriteBytes(src, readerIndex, len); - return encoding.GetString(buffer.Array, 0, len); + return encoding.GetString(buffer.Array, buffer.ArrayOffset, len); } finally { diff --git a/src/DotNetty.Buffers/UnpooledHeapByteBuffer.cs b/src/DotNetty.Buffers/UnpooledHeapByteBuffer.cs index ab5ed2c08..ad47509e2 100644 --- a/src/DotNetty.Buffers/UnpooledHeapByteBuffer.cs +++ b/src/DotNetty.Buffers/UnpooledHeapByteBuffer.cs @@ -148,7 +148,7 @@ public override IByteBuffer GetBytes(int index, byte[] dst, int dstIndex, int le public override IByteBuffer GetBytes(int index, Stream destination, int length) { - destination.Write(this.Array, this.ArrayOffset + this.ReaderIndex, this.ReadableBytes); + destination.Write(this.Array, this.ArrayOffset + index, length); return this; } diff --git a/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs b/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs index ab1f6a522..c27f72bd2 100644 --- a/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs +++ b/src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs @@ -21,6 +21,8 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor const int ST_TERMINATED = 5; const string DefaultWorkerThreadName = "SingleThreadEventExecutor worker"; + static readonly IRunnable WAKEUP_TASK = new NoOpRunnable(); + static readonly IInternalLogger Logger = InternalLoggerFactory.GetInstance(); @@ -70,15 +72,12 @@ void Loop() Task.Factory.StartNew( () => { - if (Interlocked.CompareExchange(ref this.executionState, ST_STARTED, ST_NOT_STARTED) == ST_NOT_STARTED) + Interlocked.CompareExchange(ref this.executionState, ST_STARTED, ST_NOT_STARTED); + while (!this.ConfirmShutdown()) { - while (!this.ConfirmShutdown()) - { - this.RunAllTasks(this.preciseBreakoutInterval); - } - - this.CleanupAndTerminate(true); + this.RunAllTasks(this.preciseBreakoutInterval); } + this.CleanupAndTerminate(true); }, CancellationToken.None, TaskCreationOptions.None, @@ -120,6 +119,14 @@ public override void Execute(IRunnable task) } } + protected void WakeUp(bool inEventLoop) + { + if (!inEventLoop || this.executionState == ST_SHUTTING_DOWN) + { + this.Execute(WAKEUP_TASK); + } + } + public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout) { Contract.Requires(quietPeriod >= TimeSpan.Zero); @@ -174,10 +181,10 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time // scheduleExecution(); //} - //if (wakeup) - //{ - // wakeup(inEventLoop); - //} + if (wakeup) + { + this.WakeUp(inEventLoop); + } return this.TerminationCompletion; } @@ -189,10 +196,7 @@ protected bool ConfirmShutdown() return false; } - if (!this.InEventLoop) - { - throw new InvalidOperationException("must be invoked from an event loop"); - } + Contract.Assert(this.InEventLoop, "must be invoked from an event loop"); this.CancelScheduledTasks(); @@ -210,8 +214,7 @@ protected bool ConfirmShutdown() } // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period. - // todo: ??? - //wakeup(true); + this.WakeUp(true); return false; } @@ -227,7 +230,7 @@ protected bool ConfirmShutdown() // Check if any tasks were added to the queue every 100ms. // TODO: Change the behavior of takeTask() so that it returns on timeout. // todo: ??? - //wakeup(true); + this.WakeUp(true); Thread.Sleep(100); return false; @@ -243,7 +246,6 @@ protected void CleanupAndTerminate(bool success) while (true) { int oldState = this.executionState; - ; if (oldState >= ST_SHUTTING_DOWN || Interlocked.CompareExchange(ref this.executionState, ST_SHUTTING_DOWN, oldState) == oldState) { break; @@ -398,7 +400,7 @@ IRunnable PollTask() if (task == null) { this.emptyEvent.Reset(); - if ((task = this.taskQueue.Dequeue()) == null) // revisit queue as producer might have put a task in meanwhile + if ((task = this.taskQueue.Dequeue()) == null && !this.IsShuttingDown) // revisit queue as producer might have put a task in meanwhile { IScheduledRunnable nextScheduledTask = this.ScheduledTaskQueue.Peek(); if (nextScheduledTask != null) @@ -424,27 +426,11 @@ IRunnable PollTask() return task; } - #region IDisposable members - - public void Dispose() - { - this.Dispose(true); - GC.SuppressFinalize(this); - } - - public void Dispose(bool isDisposing) + sealed class NoOpRunnable : IRunnable { - if (!this.disposed) + public void Run() { - if (isDisposing) - { - this.thread = null; - } } - - this.disposed = true; } - - #endregion } } \ No newline at end of file diff --git a/src/DotNetty.Handlers/Tls/TlsHandler.cs b/src/DotNetty.Handlers/Tls/TlsHandler.cs index 8593048c7..457f3de3e 100644 --- a/src/DotNetty.Handlers/Tls/TlsHandler.cs +++ b/src/DotNetty.Handlers/Tls/TlsHandler.cs @@ -247,7 +247,7 @@ bool EnsureAuthenticated() this.state = oldState | State.Authenticating; if (this.isServer) { - this.sslStream.AuthenticateAsServerAsync(this.certificate) // todo: change to begin/end + this.sslStream.AuthenticateAsServerAsync(this.certificate, false, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false) // todo: change to begin/end .ContinueWith(AuthenticationCompletionCallback, this, TaskContinuationOptions.ExecuteSynchronously); } else diff --git a/test/DotNetty.Common.Tests/Concurrency/SingleThreadEventExecutorTests.cs b/test/DotNetty.Common.Tests/Concurrency/SingleThreadEventExecutorTests.cs index 0e14a9e1b..1b9cc4c21 100644 --- a/test/DotNetty.Common.Tests/Concurrency/SingleThreadEventExecutorTests.cs +++ b/test/DotNetty.Common.Tests/Concurrency/SingleThreadEventExecutorTests.cs @@ -118,6 +118,21 @@ public async Task ScheduledTaskFiresOnTimeWhileBusy() Assert.True(task.IsCompleted); } + [Theory] + [InlineData(0)] + [InlineData(200)] + public async Task ShutdownWhileIdle(int delayInMs) + { + var scheduler = new SingleThreadEventExecutor("test", TimeSpan.FromMilliseconds(10)); + if (delayInMs > 0) + { + Thread.Sleep(delayInMs); + } + Task shutdownTask = scheduler.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(50), TimeSpan.FromSeconds(1)); + await Task.WhenAny(shutdownTask, Task.Delay(TimeSpan.FromSeconds(5))); + Assert.True(shutdownTask.IsCompleted); + } + class Container { public T Value; diff --git a/test/DotNetty.Tests.End2End/DotNetty.Tests.End2End.csproj b/test/DotNetty.Tests.End2End/DotNetty.Tests.End2End.csproj index 14cac9045..ae28be5c8 100644 --- a/test/DotNetty.Tests.End2End/DotNetty.Tests.End2End.csproj +++ b/test/DotNetty.Tests.End2End/DotNetty.Tests.End2End.csproj @@ -21,6 +21,7 @@ 8624fbb3 ..\..\ true + true diff --git a/test/DotNetty.Tests.End2End/End2EndTests.cs b/test/DotNetty.Tests.End2End/End2EndTests.cs index 0539003dd..54f06340d 100644 --- a/test/DotNetty.Tests.End2End/End2EndTests.cs +++ b/test/DotNetty.Tests.End2End/End2EndTests.cs @@ -11,6 +11,7 @@ namespace DotNetty.Tests.End2End using System.Text; using System.Threading.Tasks; using DotNetty.Buffers; + using DotNetty.Codecs; using DotNetty.Codecs.Mqtt; using DotNetty.Codecs.Mqtt.Packets; using DotNetty.Common.Concurrency; @@ -49,7 +50,9 @@ public async void EchoServerAndClient() var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password"); Func closeServerFunc = await this.StartServerAsync(true, ch => { - ch.Pipeline.AddLast(TlsHandler.Server(tlsCertificate)); + ch.Pipeline.AddLast("server tls", TlsHandler.Server(tlsCertificate)); + ch.Pipeline.AddLast("server prepender", new LengthFieldPrepender(2)); + ch.Pipeline.AddLast("server decoder", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); ch.Pipeline.AddLast(new EchoChannelHandler()); }, testPromise); @@ -61,7 +64,9 @@ public async void EchoServerAndClient() .Handler(new ActionChannelInitializer(ch => { string targetHost = tlsCertificate.GetNameInfo(X509NameType.DnsName, false); - ch.Pipeline.AddLast(TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true)); + ch.Pipeline.AddLast("client tls", TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true)); + ch.Pipeline.AddLast("client prepender", new LengthFieldPrepender(2)); + ch.Pipeline.AddLast("client decoder", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); ch.Pipeline.AddLast(new TestScenarioRunner(this.GetEchoClientScenario, testPromise)); })); @@ -74,8 +79,8 @@ public async void EchoServerAndClient() this.Output.WriteLine("Connected channel: {0}", clientChannel); - await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromMinutes(1))); - Assert.True(testPromise.Task.IsCompleted); + await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromSeconds(30))); + Assert.True(testPromise.Task.IsCompleted, "timed out"); testPromise.Task.Wait(); } finally @@ -277,7 +282,6 @@ async Task> StartServerAsync(bool tcpNoDelay, Action childH var bossGroup = new MultithreadEventLoopGroup(1); var workerGroup = new MultithreadEventLoopGroup(); bool started = false; - //var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password"); try { ServerBootstrap b = new ServerBootstrap()