Skip to content

Commit

Permalink
Update TcpSocketChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
cuteant committed Aug 19, 2020
1 parent c382d18 commit 78eeb31
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 57 deletions.
16 changes: 8 additions & 8 deletions src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public abstract class SingleThreadEventExecutor : AbstractScheduledEventExecutor
private readonly TaskScheduler _taskScheduler;
private readonly CountdownEvent _threadLock;
private readonly IPromise _terminationCompletionSource;
private int v_executionState = NotStartedState;
private volatile int v_executionState = NotStartedState;

protected readonly IQueue<IRunnable> _taskQueue;
private readonly IBlockingQueue<IRunnable> _blockingTaskQueue;
Expand Down Expand Up @@ -229,13 +229,13 @@ private SingleThreadEventExecutor(IEventExecutorGroup parent, bool addTaskWakesU
public virtual int PendingTasks => _taskQueue.Count;

/// <inheritdoc />
public override bool IsShuttingDown => (uint)Volatile.Read(ref v_executionState) >= ShuttingDownState;
public override bool IsShuttingDown => (uint)v_executionState >= ShuttingDownState;

/// <inheritdoc />
public override bool IsShutdown => (uint)Volatile.Read(ref v_executionState) >= ShutdownState;
public override bool IsShutdown => (uint)v_executionState >= ShutdownState;

/// <inheritdoc />
public override bool IsTerminated => (uint)Volatile.Read(ref v_executionState) >=/*==*/ TerminatedState;
public override bool IsTerminated => (uint)v_executionState >=/*==*/ TerminatedState;

/// <inheritdoc />
public override Task TerminationCompletion => _terminationCompletionSource.Task;
Expand All @@ -247,7 +247,7 @@ private SingleThreadEventExecutor(IEventExecutorGroup parent, bool addTaskWakesU

protected long GracefulShutdownStartTime => _gracefulShutdownStartTime;

protected int ExecutionState => Volatile.Read(ref v_executionState);
protected int ExecutionState => v_executionState;

#endregion

Expand Down Expand Up @@ -328,7 +328,7 @@ protected bool CompareAndSetExecutionState(int currentState, int newState)

protected void SetExecutionState(int newState)
{
var currentState = Volatile.Read(ref v_executionState);
var currentState = v_executionState;
int oldState;
do
{
Expand Down Expand Up @@ -734,7 +734,7 @@ protected virtual void Cleanup()

protected internal virtual void WakeUp(bool inEventLoop)
{
if (!inEventLoop/* || (Volatile.Read(ref v_executionState) == ST_SHUTTING_DOWN)*/)
if (!inEventLoop/* || (v_executionState == ST_SHUTTING_DOWN)*/)
{
// Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
// is already something in the queue.
Expand Down Expand Up @@ -834,7 +834,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time

bool inEventLoop = InEventLoop;
bool wakeup;
int thisState = Volatile.Read(ref v_executionState);
int thisState = v_executionState;
int oldState;
do
{
Expand Down
64 changes: 47 additions & 17 deletions src/DotNetty.Transport/Channels/Sockets/AbstractSocketChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,7 @@ public abstract partial class AbstractSocketChannel<TChannel, TUnsafe> : Abstrac

private SocketChannelAsyncOperation<TChannel, TUnsafe> _readOperation;
private SocketChannelAsyncOperation<TChannel, TUnsafe> _writeOperation;
private int v_state;
private int InternalState
{
[MethodImpl(InlineMethod.AggressiveInlining)]
get => Volatile.Read(ref v_state);
set => Interlocked.Exchange(ref v_state, value);
}
private volatile int v_state;

private IPromise _connectPromise;
private IScheduledTask _connectCancellationTask;
Expand All @@ -65,7 +59,7 @@ protected AbstractSocketChannel(IChannel parent, Socket socket)
: base(parent)
{
Socket = socket;
InternalState = StateFlags.Open;
v_state = StateFlags.Open;

try
{
Expand Down Expand Up @@ -129,35 +123,71 @@ protected internal void ClearReadPending()

void ClearReadPending0() => ReadPending = false;

protected void SetState(int stateToSet) => InternalState |= stateToSet;
[MethodImpl(InlineMethod.AggressiveOptimization)]
protected void SetState(int stateToSet) => v_state |= stateToSet;

/// <returns>state before modification</returns>
protected int ResetState(int stateToReset)
{
var oldState = InternalState;
var oldState = v_state;
if ((oldState & stateToReset) != 0)
{
InternalState = oldState & ~stateToReset;
v_state = oldState & ~stateToReset;
}
return oldState;
}

protected bool TryResetState(int stateToReset)
{
var oldState = InternalState;
var oldState = v_state;
if ((oldState & stateToReset) != 0)
{
InternalState = oldState & ~stateToReset;
v_state = oldState & ~stateToReset;
return true;
}
return false;
}

protected bool IsInState(int stateToCheck) => (InternalState & stateToCheck) == stateToCheck;
[MethodImpl(InlineMethod.AggressiveOptimization)]
protected bool IsInState(int stateToCheck) => 0u >= (uint)((v_state & stateToCheck) - stateToCheck);

protected SocketChannelAsyncOperation<TChannel, TUnsafe> ReadOperation
{
[MethodImpl(InlineMethod.AggressiveOptimization)]
get => _readOperation ?? EnsureReadOperationCreated();
}

[MethodImpl(MethodImplOptions.NoInlining)]
private SocketChannelAsyncOperation<TChannel, TUnsafe> EnsureReadOperationCreated()
{
lock (this)
{
if (_readOperation is null)
{
_readOperation = new SocketChannelAsyncOperation<TChannel, TUnsafe>((TChannel)this, true);
}
}
return _readOperation;
}

protected SocketChannelAsyncOperation<TChannel, TUnsafe> ReadOperation => _readOperation ??= new SocketChannelAsyncOperation<TChannel, TUnsafe>((TChannel)this, true);
private SocketChannelAsyncOperation<TChannel, TUnsafe> WriteOperation
{
[MethodImpl(InlineMethod.AggressiveOptimization)]
get => _writeOperation ?? EnsureWriteOperationCreated();
}

SocketChannelAsyncOperation<TChannel, TUnsafe> WriteOperation => _writeOperation ??= new SocketChannelAsyncOperation<TChannel, TUnsafe>((TChannel)this, false);
[MethodImpl(MethodImplOptions.NoInlining)]
private SocketChannelAsyncOperation<TChannel, TUnsafe> EnsureWriteOperationCreated()
{
lock (this)
{
if (_writeOperation is null)
{
_writeOperation = new SocketChannelAsyncOperation<TChannel, TUnsafe>((TChannel)this, false);
}
}
return _writeOperation;
}

#if NETCOREAPP || NETSTANDARD_2_0_GREATER
protected SocketChannelAsyncOperation<TChannel, TUnsafe> PrepareWriteOperation(in ReadOnlyMemory<byte> buffer)
Expand Down Expand Up @@ -266,7 +296,7 @@ protected override void DoBeginRead()

if (!IsInState(StateFlags.ReadScheduled))
{
InternalState |= StateFlags.ReadScheduled;
v_state |= StateFlags.ReadScheduled;
ScheduleSocketRead();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace DotNetty.Transport.Channels.Sockets
using System;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;

public sealed class TcpServerSocketChannel : TcpServerSocketChannel<TcpServerSocketChannel, TcpSocketChannelFactory>
{
Expand Down Expand Up @@ -103,7 +104,24 @@ public override bool IsActive

protected override EndPoint LocalAddressInternal => Socket.LocalEndPoint;

SocketChannelAsyncOperation<TServerChannel, TcpServerSocketChannelUnsafe> AcceptOperation => _acceptOperation ??= new SocketChannelAsyncOperation<TServerChannel, TcpServerSocketChannelUnsafe>((TServerChannel)this, false);
private SocketChannelAsyncOperation<TServerChannel, TcpServerSocketChannelUnsafe> AcceptOperation
{
[MethodImpl(InlineMethod.AggressiveOptimization)]
get => _acceptOperation ?? EnsureAcceptOperationCreated();
}

[MethodImpl(MethodImplOptions.NoInlining)]
private SocketChannelAsyncOperation<TServerChannel, TcpServerSocketChannelUnsafe> EnsureAcceptOperationCreated()
{
lock (this)
{
if (_acceptOperation is null)
{
_acceptOperation = new SocketChannelAsyncOperation<TServerChannel, TcpServerSocketChannelUnsafe>((TServerChannel)this, false);
}
}
return _acceptOperation;
}

protected override void DoBind(EndPoint localAddress)
{
Expand Down
59 changes: 28 additions & 31 deletions src/DotNetty.Transport/Channels/Sockets/TcpSocketChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -409,39 +409,36 @@ protected override void DoWrite(ChannelOutboundBuffer input)
List<ArraySegment<byte>> bufferList = sharedBufferList;
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt)
if(0u >= (uint)nioBufferCnt)
{
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
base.DoWrite(input);
return;
default:
for (int i = writeSpinCount - 1; i >= 0; i--)
{
long localWrittenBytes = socket.Send(bufferList, SocketFlags.None, out SocketError errorCode);
if (errorCode != SocketError.Success && errorCode != SocketError.WouldBlock)
{
ThrowHelper.ThrowSocketException(errorCode);
}

if (0ul >= (ulong)localWrittenBytes)
{
break;
}

expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (0ul >= (ulong)expectedWrittenBytes)
{
done = true;
break;
}
else
{
bufferList = AdjustBufferList(localWrittenBytes, bufferList);
}
}
// We have something else beside ByteBuffers to write so fallback to normal writes.
base.DoWrite(input);
return;
}
for (int i = writeSpinCount - 1; i >= 0; i--)
{
long localWrittenBytes = socket.Send(bufferList, SocketFlags.None, out SocketError errorCode);
if (errorCode != SocketError.Success && errorCode != SocketError.WouldBlock)
{
ThrowHelper.ThrowSocketException(errorCode);
}

if (0ul >= (ulong)localWrittenBytes)
{
break;
}

expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (0ul >= (ulong)expectedWrittenBytes)
{
done = true;
break;
}
else
{
bufferList = AdjustBufferList(localWrittenBytes, bufferList);
}
}

if (writtenBytes > 0)
Expand Down

0 comments on commit 78eeb31

Please sign in to comment.