Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Commit

Permalink
Refactor FrameTests and rename SocketInput SocketOutput properties (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
pakrym authored Nov 23, 2016
1 parent 2bf5a96 commit e55c624
Show file tree
Hide file tree
Showing 10 changed files with 387 additions and 1,177 deletions.
24 changes: 12 additions & 12 deletions src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public Connection(ListenerContext context, UvStreamHandle socket) : base(context
_bufferSizeControl = new BufferSizeControl(ServerOptions.Limits.MaxRequestBufferSize.Value, this, Thread);
}

SocketInput = new SocketInput(Thread.Memory, ThreadPool, _bufferSizeControl);
SocketOutput = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool);
Input = new SocketInput(Thread.Memory, ThreadPool, _bufferSizeControl);
Output = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool);

var tcpHandle = _socket as UvTcpHandle;
if (tcpHandle != null)
Expand Down Expand Up @@ -95,7 +95,7 @@ public void Start()
}
else
{
_libuvStream = new LibuvStream(SocketInput, SocketOutput);
_libuvStream = new LibuvStream(Input, Output);

_filterContext = new ConnectionFilterContext
{
Expand Down Expand Up @@ -136,7 +136,7 @@ public void Start()
public Task StopAsync()
{
_frame.StopAsync();
_frame.SocketInput.CompleteAwaiting();
_frame.Input.CompleteAwaiting();

return _socketClosedTcs.Task;
}
Expand Down Expand Up @@ -169,7 +169,7 @@ public virtual void OnSocketClosed()
}
}, this);

SocketInput.Dispose();
Input.Dispose();
_socketClosedTcs.TrySetResult(null);
}

Expand Down Expand Up @@ -197,8 +197,8 @@ private void ApplyConnectionFilter()
{
_filteredStreamAdapter = new FilteredStreamAdapter(ConnectionId, _filterContext.Connection, Thread.Memory, Log, ThreadPool, _bufferSizeControl);

_frame.SocketInput = _filteredStreamAdapter.SocketInput;
_frame.SocketOutput = _filteredStreamAdapter.SocketOutput;
_frame.Input = _filteredStreamAdapter.SocketInput;
_frame.Output = _filteredStreamAdapter.SocketOutput;

_readInputTask = _filteredStreamAdapter.ReadInputAsync();
}
Expand All @@ -214,7 +214,7 @@ private static Libuv.uv_buf_t AllocCallback(UvStreamHandle handle, int suggested

private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize)
{
var result = SocketInput.IncomingStart();
var result = Input.IncomingStart();

return handle.Libuv.buf_init(
result.DataArrayPtr + result.End,
Expand All @@ -234,7 +234,7 @@ private void OnRead(UvStreamHandle handle, int status)
// there is no data to be read right now.
// See the note at http://docs.libuv.org/en/v1.x/stream.html#c.uv_read_cb.
// We need to clean up whatever was allocated by OnAlloc.
SocketInput.IncomingDeferred();
Input.IncomingDeferred();
return;
}

Expand Down Expand Up @@ -276,7 +276,7 @@ private void OnRead(UvStreamHandle handle, int status)
error = new IOException(uvError.Message, uvError);
}

SocketInput.IncomingComplete(readCount, error);
Input.IncomingComplete(readCount, error);

if (errorDone)
{
Expand All @@ -302,7 +302,7 @@ void IConnectionControl.Resume()
// ReadStart() can throw a UvException in some cases (e.g. socket is no longer connected).
// This should be treated the same as OnRead() seeing a "normalDone" condition.
Log.ConnectionReadFin(ConnectionId);
SocketInput.IncomingComplete(0, null);
Input.IncomingComplete(0, null);
}
}

Expand All @@ -316,7 +316,7 @@ void IConnectionControl.End(ProduceEndType endType)
case ProduceEndType.SocketShutdown:
case ProduceEndType.SocketDisconnect:
Log.ConnectionDisconnect(ConnectionId);
((SocketOutput)SocketOutput).End(endType);
((SocketOutput)Output).End(endType);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public ConnectionContext(ListenerContext context)

public ListenerContext ListenerContext { get; set; }

public SocketInput SocketInput { get; set; }
public SocketInput Input { get; set; }

public ISocketOutput SocketOutput { get; set; }
public ISocketOutput Output { get; set; }

public IConnectionControl ConnectionControl { get; set; }

Expand Down
32 changes: 16 additions & 16 deletions src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public abstract partial class Frame : IFrameControl
public Frame(ConnectionContext context)
{
ConnectionContext = context;
SocketInput = context.SocketInput;
SocketOutput = context.SocketOutput;
Input = context.Input;
Output = context.Output;

ServerOptions = context.ListenerContext.ServiceContext.ServerOptions;

Expand All @@ -95,8 +95,8 @@ public Frame(ConnectionContext context)
}

public ConnectionContext ConnectionContext { get; }
public SocketInput SocketInput { get; set; }
public ISocketOutput SocketOutput { get; set; }
public SocketInput Input { get; set; }
public ISocketOutput Output { get; set; }
public Action<IFeatureCollection> PrepareRequest
{
get
Expand Down Expand Up @@ -531,13 +531,13 @@ protected async Task FireOnCompleted()
public void Flush()
{
ProduceStartAndFireOnStarting().GetAwaiter().GetResult();
SocketOutput.Flush();
Output.Flush();
}

public async Task FlushAsync(CancellationToken cancellationToken)
{
await ProduceStartAndFireOnStarting();
await SocketOutput.FlushAsync(cancellationToken);
await Output.FlushAsync(cancellationToken);
}

public void Write(ArraySegment<byte> data)
Expand All @@ -564,7 +564,7 @@ public void Write(ArraySegment<byte> data)
}
else
{
SocketOutput.Write(data);
Output.Write(data);
}
}
else
Expand Down Expand Up @@ -599,7 +599,7 @@ public Task WriteAsync(ArraySegment<byte> data, CancellationToken cancellationTo
}
else
{
return SocketOutput.WriteAsync(data, cancellationToken: cancellationToken);
return Output.WriteAsync(data, cancellationToken: cancellationToken);
}
}
else
Expand Down Expand Up @@ -631,7 +631,7 @@ public async Task WriteAsyncAwaited(ArraySegment<byte> data, CancellationToken c
}
else
{
await SocketOutput.WriteAsync(data, cancellationToken: cancellationToken);
await Output.WriteAsync(data, cancellationToken: cancellationToken);
}
}
else
Expand Down Expand Up @@ -675,17 +675,17 @@ protected void VerifyResponseContentLength()

private void WriteChunked(ArraySegment<byte> data)
{
SocketOutput.Write(data, chunk: true);
Output.Write(data, chunk: true);
}

private Task WriteChunkedAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{
return SocketOutput.WriteAsync(data, chunk: true, cancellationToken: cancellationToken);
return Output.WriteAsync(data, chunk: true, cancellationToken: cancellationToken);
}

private Task WriteChunkedResponseSuffix()
{
return SocketOutput.WriteAsync(_endChunkedResponseBytes);
return Output.WriteAsync(_endChunkedResponseBytes);
}

private static ArraySegment<byte> CreateAsciiByteArraySegment(string text)
Expand All @@ -706,7 +706,7 @@ public void ProduceContinue()
RequestHeaders.TryGetValue("Expect", out expect) &&
(expect.FirstOrDefault() ?? "").Equals("100-continue", StringComparison.OrdinalIgnoreCase))
{
SocketOutput.Write(_continueBytes);
Output.Write(_continueBytes);
}
}

Expand Down Expand Up @@ -809,7 +809,7 @@ private async Task ProduceEndAwaited()
ProduceStart(appCompleted: true);

// Force flush
await SocketOutput.FlushAsync();
await Output.FlushAsync();

await WriteSuffix();
}
Expand Down Expand Up @@ -856,7 +856,7 @@ private void CreateResponseHeader(
var hasTransferEncoding = responseHeaders.HasTransferEncoding;
var transferCoding = FrameHeaders.GetFinalTransferCoding(responseHeaders.HeaderTransferEncoding);

var end = SocketOutput.ProducingStart();
var end = Output.ProducingStart();

if (_keepAlive && hasConnection)
{
Expand Down Expand Up @@ -944,7 +944,7 @@ private void CreateResponseHeader(
responseHeaders.CopyTo(ref end);
end.CopyFrom(_bytesEndHeaders, 0, _bytesEndHeaders.Length);

SocketOutput.ProducingComplete(end);
Output.ProducingComplete(end);
}

public RequestLineStatus TakeStartLine(SocketInput input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ public override async Task RequestProcessingAsync()

while (!_requestProcessingStopping)
{
requestLineStatus = TakeStartLine(SocketInput);
requestLineStatus = TakeStartLine(Input);

if (requestLineStatus == RequestLineStatus.Done)
{
break;
}

if (SocketInput.CheckFinOrThrow())
if (Input.CheckFinOrThrow())
{
// We need to attempt to consume start lines and headers even after
// SocketInput.RemoteIntakeFin is set to true to ensure we don't close a
// connection without giving the application a chance to respond to a request
// sent immediately before the a FIN from the client.
requestLineStatus = TakeStartLine(SocketInput);
requestLineStatus = TakeStartLine(Input);

if (requestLineStatus == RequestLineStatus.Empty)
{
Expand All @@ -69,28 +69,28 @@ public override async Task RequestProcessingAsync()
break;
}

await SocketInput;
await Input;
}

InitializeHeaders();

while (!_requestProcessingStopping && !TakeMessageHeaders(SocketInput, FrameRequestHeaders))
while (!_requestProcessingStopping && !TakeMessageHeaders(Input, FrameRequestHeaders))
{
if (SocketInput.CheckFinOrThrow())
if (Input.CheckFinOrThrow())
{
// We need to attempt to consume start lines and headers even after
// SocketInput.RemoteIntakeFin is set to true to ensure we don't close a
// connection without giving the application a chance to respond to a request
// sent immediately before the a FIN from the client.
if (!TakeMessageHeaders(SocketInput, FrameRequestHeaders))
if (!TakeMessageHeaders(Input, FrameRequestHeaders))
{
RejectRequest(RequestRejectionReason.MalformedRequestInvalidHeaders);
}

break;
}

await SocketInput;
await Input;
}

if (!_requestProcessingStopping)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ private void TryProduceContinue()

private void ConsumedBytes(int count)
{
var scan = _context.SocketInput.ConsumingStart();
var scan = _context.Input.ConsumingStart();
scan.Skip(count);
_context.SocketInput.ConsumingComplete(scan, scan);
_context.Input.ConsumingComplete(scan, scan);

OnConsumedBytes(count);
}
Expand Down Expand Up @@ -305,7 +305,7 @@ public ForRemainingData(bool upgrade, Frame context)

protected override ValueTask<ArraySegment<byte>> PeekAsync(CancellationToken cancellationToken)
{
return _context.SocketInput.PeekAsync();
return _context.Input.PeekAsync();
}
}

Expand All @@ -330,7 +330,7 @@ protected override ValueTask<ArraySegment<byte>> PeekAsync(CancellationToken can
return new ValueTask<ArraySegment<byte>>();
}

var task = _context.SocketInput.PeekAsync();
var task = _context.Input.PeekAsync();

if (task.IsCompleted)
{
Expand Down Expand Up @@ -402,7 +402,7 @@ public ForChunkedEncoding(bool keepAlive, FrameRequestHeaders headers, Frame con
: base(context)
{
RequestKeepAlive = keepAlive;
_input = _context.SocketInput;
_input = _context.Input;
_requestHeaders = headers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void Setup()
SocketInput = new SocketInput(MemoryPool, ThreadPool);

var connectionContext = new MockConnection(new KestrelServerOptions());
connectionContext.SocketInput = SocketInput;
connectionContext.Input = SocketInput;

Frame = new Frame<object>(application: null, context: connectionContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ await context.Thread.PostAsync(_ =>
Libuv.uv_buf_t ignored;
mockLibuv.AllocCallback(socket.InternalGetHandle(), 2048, out ignored);
mockLibuv.ReadCallback(socket.InternalGetHandle(), 0, ref ignored);
Assert.False(connection.SocketInput.CheckFinOrThrow());
Assert.False(connection.Input.CheckFinOrThrow());
}, null);

connection.ConnectionControl.End(ProduceEndType.SocketDisconnect);
Expand Down
Loading

0 comments on commit e55c624

Please sign in to comment.