Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Commit

Permalink
Initial changes to move to pipelines (#1424)
Browse files Browse the repository at this point in the history
- Change the Sockets abstraction from Channel<byte[]> to pipelines.

#615
  • Loading branch information
davidfowl authored Feb 10, 2018
1 parent f939a7c commit 28439d1
Show file tree
Hide file tree
Showing 33 changed files with 664 additions and 378 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Internal;
using Microsoft.Extensions.Logging.Abstractions;

namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
Expand All @@ -26,12 +25,8 @@ public void GlobalSetup()

for (var i = 0; i < Connections; ++i)
{
var transportToApplication = Channel.CreateUnbounded<byte[]>(options);
var applicationToTransport = Channel.CreateUnbounded<byte[]>(options);

var application = ChannelConnection.Create<byte[]>(input: applicationToTransport, output: transportToApplication);
var transport = ChannelConnection.Create<byte[]>(input: transportToApplication, output: applicationToTransport);
var connection = new DefaultConnectionContext(Guid.NewGuid().ToString(), transport, application);
var pair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
var connection = new DefaultConnectionContext(Guid.NewGuid().ToString(), pair.Application, pair.Transport);

_hubLifetimeManager.OnConnectedAsync(new HubConnectionContext(connection, Timeout.InfiniteTimeSpan, NullLoggerFactory.Instance)).Wait();
}
Expand Down
1 change: 1 addition & 0 deletions build/dependencies.props
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<NewtonsoftJsonPackageVersion>10.0.1</NewtonsoftJsonPackageVersion>
<StackExchangeRedisStrongNamePackageVersion>1.2.4</StackExchangeRedisStrongNamePackageVersion>
<SystemBuffersPackageVersion>4.5.0-preview2-26130-01</SystemBuffersPackageVersion>
<SystemBuffersPrimitivesPackageVersion>0.1.0-preview2-180130-1</SystemBuffersPrimitivesPackageVersion>
<SystemIOPipelinesPackageVersion>0.1.0-preview2-180130-1</SystemIOPipelinesPackageVersion>
<SystemMemoryPackageVersion>4.5.0-preview2-26130-01</SystemMemoryPackageVersion>
<SystemNumericsVectorsPackageVersion>4.5.0-preview2-26130-01</SystemNumericsVectorsPackageVersion>
Expand Down
16 changes: 15 additions & 1 deletion client-ts/FunctionalTests/EchoEndPoint.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;

Expand All @@ -10,7 +11,20 @@ public class EchoEndPoint : EndPoint
{
public async override Task OnConnectedAsync(ConnectionContext connection)
{
await connection.Transport.Writer.WriteAsync(await connection.Transport.Reader.ReadAsync());
var result = await connection.Transport.Input.ReadAsync();
var buffer = result.Buffer;

try
{
if (!buffer.IsEmpty)
{
await connection.Transport.Output.WriteAsync(buffer.ToArray());
}
}
finally
{
connection.Transport.Input.AdvanceTo(result.Buffer.End);
}
}
}
}
3 changes: 2 additions & 1 deletion samples/SocialWeather/PersistentConnectionLifeTimeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
using Microsoft.AspNetCore.Sockets.Features;
Expand Down Expand Up @@ -50,7 +51,7 @@ public async Task SendToAllAsync<T>(T data)
var ms = new MemoryStream();
await formatter.WriteAsync(data, ms);

connection.Transport.Writer.TryWrite(ms.ToArray());
await connection.Transport.Output.WriteAsync(ms.ToArray());
}
}

Expand Down
28 changes: 21 additions & 7 deletions samples/SocialWeather/SocialWeatherEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,29 @@ public async Task ProcessRequests(ConnectionContext connection)
var formatter = _formatterResolver.GetFormatter<WeatherReport>(
(string)connection.Metadata["format"]);

while (await connection.Transport.Reader.WaitToReadAsync())
while (true)
{
if (connection.Transport.Reader.TryRead(out var buffer))
var result = await connection.Transport.Input.ReadAsync();
var buffer = result.Buffer;
try
{
var stream = new MemoryStream();
await stream.WriteAsync(buffer, 0, buffer.Length);
stream.Position = 0;
var weatherReport = await formatter.ReadAsync(stream);
await _lifetimeManager.SendToAllAsync(weatherReport);
if (!buffer.IsEmpty)
{
var stream = new MemoryStream();
var data = buffer.ToArray();
await stream.WriteAsync(data, 0, data.Length);
stream.Position = 0;
var weatherReport = await formatter.ReadAsync(stream);
await _lifetimeManager.SendToAllAsync(weatherReport);
}
else if (result.IsCompleted)
{
break;
}
}
finally
{
connection.Transport.Input.AdvanceTo(buffer.End);
}
}
}
Expand Down
29 changes: 22 additions & 7 deletions samples/SocketsSample/EndPoints/MessagesEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets;
Expand All @@ -20,14 +21,28 @@ public override async Task OnConnectedAsync(ConnectionContext connection)

try
{
while (await connection.Transport.Reader.WaitToReadAsync())
while (true)
{
if (connection.Transport.Reader.TryRead(out var buffer))
var result = await connection.Transport.Input.ReadAsync();
var buffer = result.Buffer;

try
{
if (!buffer.IsEmpty)
{
// We can avoid the copy here but we'll deal with that later
var text = Encoding.UTF8.GetString(buffer.ToArray());
text = $"{connection.ConnectionId}: {text}";
await Broadcast(Encoding.UTF8.GetBytes(text));
}
else if (result.IsCompleted)
{
break;
}
}
finally
{
// We can avoid the copy here but we'll deal with that later
var text = Encoding.UTF8.GetString(buffer);
text = $"{connection.ConnectionId}: {text}";
await Broadcast(Encoding.UTF8.GetBytes(text));
connection.Transport.Input.AdvanceTo(buffer.End);
}
}
}
Expand All @@ -50,7 +65,7 @@ private Task Broadcast(byte[] payload)

foreach (var c in Connections)
{
tasks.Add(c.Transport.Writer.WriteAsync(payload));
tasks.Add(c.Transport.Output.WriteAsync(payload));
}

return Task.WhenAll(tasks);
Expand Down
2 changes: 1 addition & 1 deletion samples/SocketsSample/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:57707/",
"applicationUrl": "http://localhost:59847/",
"sslPort": 0
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Buffers;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using Microsoft.AspNetCore.SignalR.Internal.Encoders;
Expand All @@ -19,6 +21,15 @@ public HubProtocolReaderWriter(IHubProtocol hubProtocol, IDataEncoder dataEncode
_dataEncoder = dataEncoder;
}

public bool ReadMessages(ReadOnlyBuffer<byte> buffer, IInvocationBinder binder, out IList<HubMessage> messages, out SequencePosition consumed, out SequencePosition examined)
{
// TODO: Fix this implementation to be incremental
consumed = buffer.End;
examined = consumed;

return ReadMessages(buffer.ToArray(), binder, out messages);
}

public bool ReadMessages(byte[] input, IInvocationBinder binder, out IList<HubMessage> messages)
{
var buffer = _dataEncoder.Decode(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Buffers;
using System.Collections;
using System.IO;
using Microsoft.AspNetCore.SignalR.Internal.Formatters;
using Newtonsoft.Json;
Expand Down Expand Up @@ -53,5 +55,27 @@ public static bool TryParseMessage(ReadOnlySpan<byte> input, out NegotiationMess
}
return true;
}

public static bool TryParseMessage(ReadOnlyBuffer<byte> buffer, out NegotiationMessage negotiationMessage, out SequencePosition consumed, out SequencePosition examined)
{
var separator = buffer.PositionOf(TextMessageFormatter.RecordSeparator);
if (separator == null)
{
// Haven't seen the entire negotiate message so bail
consumed = buffer.Start;
examined = buffer.End;
negotiationMessage = null;
return false;
}
else
{
consumed = buffer.GetPosition(separator.Value, 1);
examined = consumed;
}

var memory = buffer.IsSingleSegment ? buffer.First : buffer.ToArray();

return TryParseMessage(memory.Span, out negotiationMessage);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
<PackageReference Include="System.Memory" Version="$(SystemMemoryPackageVersion)" />
<PackageReference Include="System.Buffers" Version="$(SystemBuffersPackageVersion)" />
<PackageReference Include="System.Buffers.Primitives" Version="$(SystemBuffersPrimitivesPackageVersion)" />
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="$(SystemRuntimeCompilerServicesUnsafePackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Options" Version="$(MicrosoftExtensionsOptionsPackageVersion)" />
</ItemGroup>

Expand Down
Loading

0 comments on commit 28439d1

Please sign in to comment.