Skip to content

Commit

Permalink
Merge pull request #91 from nayato/shutdown
Browse files Browse the repository at this point in the history
Fixes ByteBuffer+Stream integration, TLS neg, STEE Shutdown
  • Loading branch information
nayato committed Apr 27, 2016
2 parents d3e6fe4 + aa3cbc6 commit 66caa64
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 49 deletions.
7 changes: 5 additions & 2 deletions examples/Echo.Client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Echo.Client
using System.Net;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using DotNetty.Codecs;
using DotNetty.Common.Internal.Logging;
using DotNetty.Handlers.Tls;
using DotNetty.Transport.Bootstrapping;
Expand Down Expand Up @@ -42,10 +43,12 @@ static async Task RunClient()
string targetHost = cert.GetNameInfo(X509NameType.DnsName, false);
pipeline.AddLast(TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
}
pipeline.AddLast(new LengthFieldPrepender(2));
pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));

pipeline.AddLast(new EchoClientHandler());
}));

IChannel bootstrapChannel = await bootstrap.ConnectAsync(new IPEndPoint(EchoClientSettings.Host, EchoClientSettings.Port));

Console.ReadLine();
Expand All @@ -64,4 +67,4 @@ static void Main(string[] args)
Task.Run(() => RunClient()).Wait();
}
}
}
}
5 changes: 4 additions & 1 deletion examples/Echo.Server/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Echo.Server
using System.Diagnostics.Tracing;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using DotNetty.Codecs;
using DotNetty.Common.Internal.Logging;
using DotNetty.Handlers.Logging;
using DotNetty.Handlers.Tls;
Expand Down Expand Up @@ -41,6 +42,8 @@ static async Task RunServer()
{
pipeline.AddLast(TlsHandler.Server(new X509Certificate2("dotnetty.com.pfx", "password")));
}
pipeline.AddLast(new LengthFieldPrepender(2));
pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));

pipeline.AddLast(new EchoServerHandler());
}));
Expand All @@ -63,4 +66,4 @@ static void Main(string[] args)
Task.Run(() => RunServer()).Wait();
}
}
}
}
3 changes: 2 additions & 1 deletion src/DotNetty.Buffers/ByteBufferUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -635,10 +635,11 @@ public static string DecodeString(IByteBuffer src, int readerIndex, int len, Enc
else
{
IByteBuffer buffer = src.Allocator.Buffer(len);
Contract.Assert(buffer.HasArray, "Operation expects allocator to operate array-based buffers.");
try
{
buffer.WriteBytes(src, readerIndex, len);
return encoding.GetString(buffer.Array, 0, len);
return encoding.GetString(buffer.Array, buffer.ArrayOffset, len);
}
finally
{
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetty.Buffers/UnpooledHeapByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public override IByteBuffer GetBytes(int index, byte[] dst, int dstIndex, int le

public override IByteBuffer GetBytes(int index, Stream destination, int length)
{
destination.Write(this.Array, this.ArrayOffset + this.ReaderIndex, this.ReadableBytes);
destination.Write(this.Array, this.ArrayOffset + index, length);
return this;
}

Expand Down
62 changes: 24 additions & 38 deletions src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
const int ST_TERMINATED = 5;
const string DefaultWorkerThreadName = "SingleThreadEventExecutor worker";

static readonly IRunnable WAKEUP_TASK = new NoOpRunnable();

static readonly IInternalLogger Logger =
InternalLoggerFactory.GetInstance<SingleThreadEventExecutor>();

Expand Down Expand Up @@ -70,15 +72,12 @@ void Loop()
Task.Factory.StartNew(
() =>
{
if (Interlocked.CompareExchange(ref this.executionState, ST_STARTED, ST_NOT_STARTED) == ST_NOT_STARTED)
Interlocked.CompareExchange(ref this.executionState, ST_STARTED, ST_NOT_STARTED);
while (!this.ConfirmShutdown())
{
while (!this.ConfirmShutdown())
{
this.RunAllTasks(this.preciseBreakoutInterval);
}

this.CleanupAndTerminate(true);
this.RunAllTasks(this.preciseBreakoutInterval);
}
this.CleanupAndTerminate(true);
},
CancellationToken.None,
TaskCreationOptions.None,
Expand Down Expand Up @@ -120,6 +119,14 @@ public override void Execute(IRunnable task)
}
}

protected void WakeUp(bool inEventLoop)
{
if (!inEventLoop || this.executionState == ST_SHUTTING_DOWN)
{
this.Execute(WAKEUP_TASK);
}
}

public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
{
Contract.Requires(quietPeriod >= TimeSpan.Zero);
Expand Down Expand Up @@ -174,10 +181,10 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
// scheduleExecution();
//}

//if (wakeup)
//{
// wakeup(inEventLoop);
//}
if (wakeup)
{
this.WakeUp(inEventLoop);
}

return this.TerminationCompletion;
}
Expand All @@ -189,10 +196,7 @@ protected bool ConfirmShutdown()
return false;
}

if (!this.InEventLoop)
{
throw new InvalidOperationException("must be invoked from an event loop");
}
Contract.Assert(this.InEventLoop, "must be invoked from an event loop");

this.CancelScheduledTasks();

Expand All @@ -210,8 +214,7 @@ protected bool ConfirmShutdown()
}

// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
// todo: ???
//wakeup(true);
this.WakeUp(true);
return false;
}

Expand All @@ -227,7 +230,7 @@ protected bool ConfirmShutdown()
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
// todo: ???
//wakeup(true);
this.WakeUp(true);
Thread.Sleep(100);

return false;
Expand All @@ -243,7 +246,6 @@ protected void CleanupAndTerminate(bool success)
while (true)
{
int oldState = this.executionState;
;
if (oldState >= ST_SHUTTING_DOWN || Interlocked.CompareExchange(ref this.executionState, ST_SHUTTING_DOWN, oldState) == oldState)
{
break;
Expand Down Expand Up @@ -398,7 +400,7 @@ IRunnable PollTask()
if (task == null)
{
this.emptyEvent.Reset();
if ((task = this.taskQueue.Dequeue()) == null) // revisit queue as producer might have put a task in meanwhile
if ((task = this.taskQueue.Dequeue()) == null && !this.IsShuttingDown) // revisit queue as producer might have put a task in meanwhile
{
IScheduledRunnable nextScheduledTask = this.ScheduledTaskQueue.Peek();
if (nextScheduledTask != null)
Expand All @@ -424,27 +426,11 @@ IRunnable PollTask()
return task;
}

#region IDisposable members

public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}

public void Dispose(bool isDisposing)
sealed class NoOpRunnable : IRunnable
{
if (!this.disposed)
public void Run()
{
if (isDisposing)
{
this.thread = null;
}
}

this.disposed = true;
}

#endregion
}
}
2 changes: 1 addition & 1 deletion src/DotNetty.Handlers/Tls/TlsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ bool EnsureAuthenticated()
this.state = oldState | State.Authenticating;
if (this.isServer)
{
this.sslStream.AuthenticateAsServerAsync(this.certificate) // todo: change to begin/end
this.sslStream.AuthenticateAsServerAsync(this.certificate, false, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false) // todo: change to begin/end
.ContinueWith(AuthenticationCompletionCallback, this, TaskContinuationOptions.ExecuteSynchronously);
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@ public async Task ScheduledTaskFiresOnTimeWhileBusy()
Assert.True(task.IsCompleted);
}

[Theory]
[InlineData(0)]
[InlineData(200)]
public async Task ShutdownWhileIdle(int delayInMs)
{
var scheduler = new SingleThreadEventExecutor("test", TimeSpan.FromMilliseconds(10));
if (delayInMs > 0)
{
Thread.Sleep(delayInMs);
}
Task shutdownTask = scheduler.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(50), TimeSpan.FromSeconds(1));
await Task.WhenAny(shutdownTask, Task.Delay(TimeSpan.FromSeconds(5)));
Assert.True(shutdownTask.IsCompleted);
}

class Container<T>
{
public T Value;
Expand Down
1 change: 1 addition & 0 deletions test/DotNetty.Tests.End2End/DotNetty.Tests.End2End.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<NuGetPackageImportStamp>8624fbb3</NuGetPackageImportStamp>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\</SolutionDir>
<RestorePackages>true</RestorePackages>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
Expand Down
14 changes: 9 additions & 5 deletions test/DotNetty.Tests.End2End/End2EndTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace DotNetty.Tests.End2End
using System.Text;
using System.Threading.Tasks;
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Codecs.Mqtt;
using DotNetty.Codecs.Mqtt.Packets;
using DotNetty.Common.Concurrency;
Expand Down Expand Up @@ -49,7 +50,9 @@ public async void EchoServerAndClient()
var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password");
Func<Task> closeServerFunc = await this.StartServerAsync(true, ch =>
{
ch.Pipeline.AddLast(TlsHandler.Server(tlsCertificate));
ch.Pipeline.AddLast("server tls", TlsHandler.Server(tlsCertificate));
ch.Pipeline.AddLast("server prepender", new LengthFieldPrepender(2));
ch.Pipeline.AddLast("server decoder", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
ch.Pipeline.AddLast(new EchoChannelHandler());
}, testPromise);

Expand All @@ -61,7 +64,9 @@ public async void EchoServerAndClient()
.Handler(new ActionChannelInitializer<ISocketChannel>(ch =>
{
string targetHost = tlsCertificate.GetNameInfo(X509NameType.DnsName, false);
ch.Pipeline.AddLast(TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
ch.Pipeline.AddLast("client tls", TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
ch.Pipeline.AddLast("client prepender", new LengthFieldPrepender(2));
ch.Pipeline.AddLast("client decoder", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
ch.Pipeline.AddLast(new TestScenarioRunner(this.GetEchoClientScenario, testPromise));
}));

Expand All @@ -74,8 +79,8 @@ public async void EchoServerAndClient()

this.Output.WriteLine("Connected channel: {0}", clientChannel);

await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromMinutes(1)));
Assert.True(testPromise.Task.IsCompleted);
await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromSeconds(30)));
Assert.True(testPromise.Task.IsCompleted, "timed out");
testPromise.Task.Wait();
}
finally
Expand Down Expand Up @@ -277,7 +282,6 @@ async Task<Func<Task>> StartServerAsync(bool tcpNoDelay, Action<IChannel> childH
var bossGroup = new MultithreadEventLoopGroup(1);
var workerGroup = new MultithreadEventLoopGroup();
bool started = false;
//var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password");
try
{
ServerBootstrap b = new ServerBootstrap()
Expand Down

0 comments on commit 66caa64

Please sign in to comment.