Skip to content

Commit

Permalink
Speed up of framing spec
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Dec 19, 2021
1 parent 0710c0c commit df48ca5
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 62 deletions.
114 changes: 61 additions & 53 deletions src/core/Akka.Streams.Tests/Dsl/FramingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Text;
using System.Text;
using System.Threading.Tasks;
using Akka.IO;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation.Fusing;
Expand Down Expand Up @@ -259,72 +260,79 @@ private static ByteString EncodeComplexFrame(
[Fact]
public void Length_field_based_framing_must_work_with_various_byte_orders_frame_lengths_and_offsets()
{
var counter = 1;
foreach (var byteOrder in ByteOrders)
IEnumerable<Task<(IEnumerable<ByteString>, List<ByteString>, (ByteOrder, int, int))>> GetFutureResults()
{
foreach (var byteOrder in ByteOrders)
foreach (var fieldOffset in FieldOffsets)
foreach (var fieldLength in FieldLengths)
{
foreach (var fieldLength in FieldLengths)
var encodedFrames = FrameLengths.Where(x => x < 1L << (fieldLength * 8)).Select(length =>
{
var encodedFrames = FrameLengths.Where(x => x < 1L << (fieldLength * 8)).Select(length =>
{
var payload = ReferenceChunk.Slice(0, length);
return Encode(payload, fieldOffset, fieldLength, byteOrder);
}).ToList();

var task = Source.From(encodedFrames)
.Via(Rechunk)
.Via(Framing.LengthField(fieldLength, int.MaxValue, fieldOffset, byteOrder))
.Grouped(10000)
.RunWith(Sink.First<IEnumerable<ByteString>>(), Materializer);

task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
task.Result.Should().BeEquivalentTo(encodedFrames);

_helper.WriteLine($"{counter++} from 80 passed");
}
var payload = ReferenceChunk.Slice(0, length);
return Encode(payload, fieldOffset, fieldLength, byteOrder);
}).ToList();

yield return Source.From(encodedFrames)
.Via(Rechunk)
.Via(Framing.LengthField(fieldLength, int.MaxValue, fieldOffset, byteOrder))
.Grouped(10000)
.RunWith(Sink.First<IEnumerable<ByteString>>(), Materializer)
.ContinueWith(t => (t.Result, encodedFrames, (byteOrder, fieldOffset, fieldLength)));
}
}
}


Parallel.ForEach(GetFutureResults(), async futureResult =>
{
var (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)) = await futureResult;
result.ShouldBeSame(encodedFrames, $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}");
});
}

[Fact]
public void Length_field_based_framing_must_work_with_various_byte_orders_frame_lengths_and_offsets_using_ComputeFrameSize()
{
foreach (var byteOrder in ByteOrders)
foreach (var fieldOffset in FieldOffsets)
foreach (var fieldLength in FieldLengths)
IEnumerable<Task<(IEnumerable<ByteString>, List<ByteString>, (ByteOrder, int, int))>> GetFutureResults()
{
int ComputeFrameSize(IReadOnlyList<byte> offset, int length)
foreach (var byteOrder in ByteOrders)
foreach (var fieldOffset in FieldOffsets)
foreach (var fieldLength in FieldLengths)
{
var sizeWithoutTail = offset.Count + fieldLength + length;
return offset.Count > 0 ? offset[0] + sizeWithoutTail : sizeWithoutTail;
}
int ComputeFrameSize(IReadOnlyList<byte> offset, int length)
{
var sizeWithoutTail = offset.Count + fieldLength + length;
return offset.Count > 0 ? offset[0] + sizeWithoutTail : sizeWithoutTail;
}

var random= new Random();
byte[] Offset()
{
var arr = new byte[fieldOffset];
if (arr.Length > 0) arr[0] = Convert.ToByte(random.Next(128));
return arr;
var random = new Random();
byte[] Offset()
{
var arr = new byte[fieldOffset];
if (arr.Length > 0) arr[0] = Convert.ToByte(random.Next(128));
return arr;
}

var encodedFrames = FrameLengths.Where(x => x < 1L << (fieldLength * 8)).Select(length =>
{
var payload = ReferenceChunk.Slice(0, length);
var offsetBytes = Offset();
var tailBytes = offsetBytes.Length > 0 ? new byte[offsetBytes[0]] : Array.Empty<byte>();
return EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(offsetBytes), ByteString.FromBytes(tailBytes));
}).ToList();

yield return Source.From(encodedFrames)
.Via(Rechunk)
.Via(Framing.LengthField(fieldLength, fieldOffset, int.MaxValue, byteOrder, ComputeFrameSize))
.Grouped(10000)
.RunWith(Sink.First<IEnumerable<ByteString>>(), Materializer)
.ContinueWith(t => (t.Result, encodedFrames, (byteOrder, fieldOffset, fieldLength)));
}

var encodedFrames = FrameLengths.Where(x => x < 1L << (fieldLength * 8)).Select(length =>
{
var payload = ReferenceChunk.Slice(0, length);
var offsetBytes = Offset();
var tailBytes = offsetBytes.Length > 0 ? new byte[offsetBytes[0]] : Array.Empty<byte>();
return EncodeComplexFrame(payload, fieldLength, byteOrder, ByteString.FromBytes(offsetBytes), ByteString.FromBytes(tailBytes));
}).ToList();

var task = Source.From(encodedFrames)
.Via(Rechunk)
.Via(Framing.LengthField(fieldLength, fieldOffset, int.MaxValue, byteOrder, ComputeFrameSize))
.Grouped(10000)
.RunWith(Sink.First<IEnumerable<ByteString>>(), Materializer);

task.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue();
task.Result.Should().BeEquivalentTo(encodedFrames);
}

Parallel.ForEach(GetFutureResults(), async futureResult =>
{
var (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)) = await futureResult;
result.ShouldBeSame(encodedFrames, $"byteOrder: {byteOrder}, fieldOffset: {fieldOffset}, fieldLength: {fieldLength}");
});
}

[Fact]
Expand Down
18 changes: 9 additions & 9 deletions src/core/Akka.Streams/Dsl/Framing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -483,26 +483,26 @@ private void TryPull()
private readonly Func<IEnumerator<byte>, int, int> _intDecoder;
private readonly Option<Func<IReadOnlyList<byte>, int, int>> _computeFrameSize;

// For the sake of binary compatibility
public LengthFieldFramingStage(int lengthFieldLength, int maximumFramelength, int lengthFieldOffset, ByteOrder byteOrder)
: this(lengthFieldLength, maximumFramelength, lengthFieldOffset, byteOrder, Option<Func<IReadOnlyList<byte>, int, int>>.None)
{ }

public LengthFieldFramingStage(
int lengthFieldLength,
int lengthFieldLength,
int maximumFrameLength,
int lengthFieldOffset,
int lengthFieldOffset,
ByteOrder byteOrder,
Option<Func<IReadOnlyList<byte>, int, int>> computeFrameSize) : base("LengthFieldFramingStage")
{
_lengthFieldLength = lengthFieldLength;
_lengthFieldLength = lengthFieldLength;
_maximumFramelength = maximumFrameLength;
_lengthFieldOffset = lengthFieldOffset;
_lengthFieldOffset = lengthFieldOffset;
_minimumChunkSize = lengthFieldOffset + lengthFieldLength;
_computeFrameSize = computeFrameSize;
_intDecoder = byteOrder == ByteOrder.BigEndian ? BigEndianDecoder : LittleEndianDecoder;
}

// For the sake of binary compatibility
public LengthFieldFramingStage(int lengthFieldLength, int maximumFramelength, int lengthFieldOffset, ByteOrder byteOrder)
: this(lengthFieldLength, maximumFramelength, lengthFieldOffset, byteOrder, Option<Func<IReadOnlyList<byte>, int, int>>.None)
{ }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);
}
}
Expand Down

0 comments on commit df48ca5

Please sign in to comment.