Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes ByteBuffer+Stream integration, TLS neg, STEE Shutdown #91

Merged
merged 1 commit into from
Apr 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions examples/Echo.Client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Echo.Client
using System.Net;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using DotNetty.Codecs;
using DotNetty.Common.Internal.Logging;
using DotNetty.Handlers.Tls;
using DotNetty.Transport.Bootstrapping;
Expand Down Expand Up @@ -42,10 +43,12 @@ static async Task RunClient()
string targetHost = cert.GetNameInfo(X509NameType.DnsName, false);
pipeline.AddLast(TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
}
pipeline.AddLast(new LengthFieldPrepender(2));
pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));

pipeline.AddLast(new EchoClientHandler());
}));

IChannel bootstrapChannel = await bootstrap.ConnectAsync(new IPEndPoint(EchoClientSettings.Host, EchoClientSettings.Port));

Console.ReadLine();
Expand All @@ -64,4 +67,4 @@ static void Main(string[] args)
Task.Run(() => RunClient()).Wait();
}
}
}
}
5 changes: 4 additions & 1 deletion examples/Echo.Server/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Echo.Server
using System.Diagnostics.Tracing;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using DotNetty.Codecs;
using DotNetty.Common.Internal.Logging;
using DotNetty.Handlers.Logging;
using DotNetty.Handlers.Tls;
Expand Down Expand Up @@ -41,6 +42,8 @@ static async Task RunServer()
{
pipeline.AddLast(TlsHandler.Server(new X509Certificate2("dotnetty.com.pfx", "password")));
}
pipeline.AddLast(new LengthFieldPrepender(2));
pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));

pipeline.AddLast(new EchoServerHandler());
}));
Expand All @@ -63,4 +66,4 @@ static void Main(string[] args)
Task.Run(() => RunServer()).Wait();
}
}
}
}
3 changes: 2 additions & 1 deletion src/DotNetty.Buffers/ByteBufferUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -635,10 +635,11 @@ public static string DecodeString(IByteBuffer src, int readerIndex, int len, Enc
else
{
IByteBuffer buffer = src.Allocator.Buffer(len);
Contract.Assert(buffer.HasArray, "Operation expects allocator to operate array-based buffers.");
try
{
buffer.WriteBytes(src, readerIndex, len);
return encoding.GetString(buffer.Array, 0, len);
return encoding.GetString(buffer.Array, buffer.ArrayOffset, len);
}
finally
{
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetty.Buffers/UnpooledHeapByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public override IByteBuffer GetBytes(int index, byte[] dst, int dstIndex, int le

public override IByteBuffer GetBytes(int index, Stream destination, int length)
{
destination.Write(this.Array, this.ArrayOffset + this.ReaderIndex, this.ReadableBytes);
destination.Write(this.Array, this.ArrayOffset + index, length);
return this;
}

Expand Down
62 changes: 24 additions & 38 deletions src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
const int ST_TERMINATED = 5;
const string DefaultWorkerThreadName = "SingleThreadEventExecutor worker";

static readonly IRunnable WAKEUP_TASK = new NoOpRunnable();

static readonly IInternalLogger Logger =
InternalLoggerFactory.GetInstance<SingleThreadEventExecutor>();

Expand Down Expand Up @@ -70,15 +72,12 @@ void Loop()
Task.Factory.StartNew(
() =>
{
if (Interlocked.CompareExchange(ref this.executionState, ST_STARTED, ST_NOT_STARTED) == ST_NOT_STARTED)
Interlocked.CompareExchange(ref this.executionState, ST_STARTED, ST_NOT_STARTED);
while (!this.ConfirmShutdown())
{
while (!this.ConfirmShutdown())
{
this.RunAllTasks(this.preciseBreakoutInterval);
}

this.CleanupAndTerminate(true);
this.RunAllTasks(this.preciseBreakoutInterval);
}
this.CleanupAndTerminate(true);
},
CancellationToken.None,
TaskCreationOptions.None,
Expand Down Expand Up @@ -120,6 +119,14 @@ public override void Execute(IRunnable task)
}
}

protected void WakeUp(bool inEventLoop)
{
if (!inEventLoop || this.executionState == ST_SHUTTING_DOWN)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to if (!inEventLoop || IsShuttingDown)

Copy link
Member Author

@nayato nayato Apr 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's difference between IsShuttingDown is >= and WakeUp uses ==

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, right. guess there's no point in waking up after we've terminated.

{
this.Execute(WAKEUP_TASK);
}
}

public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
{
Contract.Requires(quietPeriod >= TimeSpan.Zero);
Expand Down Expand Up @@ -174,10 +181,10 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
// scheduleExecution();
//}

//if (wakeup)
//{
// wakeup(inEventLoop);
//}
if (wakeup)
{
this.WakeUp(inEventLoop);
}

return this.TerminationCompletion;
}
Expand All @@ -189,10 +196,7 @@ protected bool ConfirmShutdown()
return false;
}

if (!this.InEventLoop)
{
throw new InvalidOperationException("must be invoked from an event loop");
}
Contract.Assert(this.InEventLoop, "must be invoked from an event loop");

this.CancelScheduledTasks();

Expand All @@ -210,8 +214,7 @@ protected bool ConfirmShutdown()
}

// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
// todo: ???
//wakeup(true);
this.WakeUp(true);
return false;
}

Expand All @@ -227,7 +230,7 @@ protected bool ConfirmShutdown()
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
// todo: ???
//wakeup(true);
this.WakeUp(true);
Thread.Sleep(100);

return false;
Expand All @@ -243,7 +246,6 @@ protected void CleanupAndTerminate(bool success)
while (true)
{
int oldState = this.executionState;
;
if (oldState >= ST_SHUTTING_DOWN || Interlocked.CompareExchange(ref this.executionState, ST_SHUTTING_DOWN, oldState) == oldState)
{
break;
Expand Down Expand Up @@ -398,7 +400,7 @@ IRunnable PollTask()
if (task == null)
{
this.emptyEvent.Reset();
if ((task = this.taskQueue.Dequeue()) == null) // revisit queue as producer might have put a task in meanwhile
if ((task = this.taskQueue.Dequeue()) == null && !this.IsShuttingDown) // revisit queue as producer might have put a task in meanwhile
{
IScheduledRunnable nextScheduledTask = this.ScheduledTaskQueue.Peek();
if (nextScheduledTask != null)
Expand All @@ -424,27 +426,11 @@ IRunnable PollTask()
return task;
}

#region IDisposable members

public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}

public void Dispose(bool isDisposing)
sealed class NoOpRunnable : IRunnable
{
if (!this.disposed)
public void Run()
{
if (isDisposing)
{
this.thread = null;
}
}

this.disposed = true;
}

#endregion
}
}
2 changes: 1 addition & 1 deletion src/DotNetty.Handlers/Tls/TlsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ bool EnsureAuthenticated()
this.state = oldState | State.Authenticating;
if (this.isServer)
{
this.sslStream.AuthenticateAsServerAsync(this.certificate) // todo: change to begin/end
this.sslStream.AuthenticateAsServerAsync(this.certificate, false, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false) // todo: change to begin/end
.ContinueWith(AuthenticationCompletionCallback, this, TaskContinuationOptions.ExecuteSynchronously);
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@ public async Task ScheduledTaskFiresOnTimeWhileBusy()
Assert.True(task.IsCompleted);
}

[Theory]
[InlineData(0)]
[InlineData(200)]
public async Task ShutdownWhileIdle(int delayInMs)
{
var scheduler = new SingleThreadEventExecutor("test", TimeSpan.FromMilliseconds(10));
if (delayInMs > 0)
{
Thread.Sleep(delayInMs);
}
Task shutdownTask = scheduler.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(50), TimeSpan.FromSeconds(1));
await Task.WhenAny(shutdownTask, Task.Delay(TimeSpan.FromSeconds(5)));
Assert.True(shutdownTask.IsCompleted);
}

class Container<T>
{
public T Value;
Expand Down
1 change: 1 addition & 0 deletions test/DotNetty.Tests.End2End/DotNetty.Tests.End2End.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<NuGetPackageImportStamp>8624fbb3</NuGetPackageImportStamp>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\</SolutionDir>
<RestorePackages>true</RestorePackages>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
Expand Down
14 changes: 9 additions & 5 deletions test/DotNetty.Tests.End2End/End2EndTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace DotNetty.Tests.End2End
using System.Text;
using System.Threading.Tasks;
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Codecs.Mqtt;
using DotNetty.Codecs.Mqtt.Packets;
using DotNetty.Common.Concurrency;
Expand Down Expand Up @@ -49,7 +50,9 @@ public async void EchoServerAndClient()
var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password");
Func<Task> closeServerFunc = await this.StartServerAsync(true, ch =>
{
ch.Pipeline.AddLast(TlsHandler.Server(tlsCertificate));
ch.Pipeline.AddLast("server tls", TlsHandler.Server(tlsCertificate));
ch.Pipeline.AddLast("server prepender", new LengthFieldPrepender(2));
ch.Pipeline.AddLast("server decoder", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
ch.Pipeline.AddLast(new EchoChannelHandler());
}, testPromise);

Expand All @@ -61,7 +64,9 @@ public async void EchoServerAndClient()
.Handler(new ActionChannelInitializer<ISocketChannel>(ch =>
{
string targetHost = tlsCertificate.GetNameInfo(X509NameType.DnsName, false);
ch.Pipeline.AddLast(TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
ch.Pipeline.AddLast("client tls", TlsHandler.Client(targetHost, null, (sender, certificate, chain, errors) => true));
ch.Pipeline.AddLast("client prepender", new LengthFieldPrepender(2));
ch.Pipeline.AddLast("client decoder", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
ch.Pipeline.AddLast(new TestScenarioRunner(this.GetEchoClientScenario, testPromise));
}));

Expand All @@ -74,8 +79,8 @@ public async void EchoServerAndClient()

this.Output.WriteLine("Connected channel: {0}", clientChannel);

await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromMinutes(1)));
Assert.True(testPromise.Task.IsCompleted);
await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromSeconds(30)));
Assert.True(testPromise.Task.IsCompleted, "timed out");
testPromise.Task.Wait();
}
finally
Expand Down Expand Up @@ -277,7 +282,6 @@ async Task<Func<Task>> StartServerAsync(bool tcpNoDelay, Action<IChannel> childH
var bossGroup = new MultithreadEventLoopGroup(1);
var workerGroup = new MultithreadEventLoopGroup();
bool started = false;
//var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password");
try
{
ServerBootstrap b = new ServerBootstrap()
Expand Down