Skip to content

Commit

Permalink
spanification of WireFormatting & Reader/Writer
Browse files Browse the repository at this point in the history
  • Loading branch information
bollhals committed Jun 17, 2020
1 parent 0f5425a commit e593e36
Show file tree
Hide file tree
Showing 18 changed files with 374 additions and 362 deletions.
29 changes: 24 additions & 5 deletions projects/Apigen/apigen/Apigen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -855,9 +855,28 @@ public void EmitClassMethodImplementations(AmqpClass c)
EmitLine("");
EmitLine(" public override void WriteArgumentsTo(ref Client.Impl.MethodArgumentWriter writer)");
EmitLine(" {");
var lastWasBitClass = false;
foreach (AmqpField f in m.m_Fields)
{
EmitLine($" writer.Write{MangleClass(ResolveDomain(f.Domain))}(_{MangleMethod(f.Name)});");
string mangleClass = MangleClass(ResolveDomain(f.Domain));
if (mangleClass != "Bit")
{
if (lastWasBitClass)
{
EmitLine($" writer.EndBits();");
lastWasBitClass = false;
}
}
else
{
lastWasBitClass = true;
}

EmitLine($" writer.Write{mangleClass}(_{MangleMethod(f.Name)});");
}
if (lastWasBitClass)
{
EmitLine($" writer.EndBits();");
}
EmitLine(" }");
EmitLine("");
Expand Down Expand Up @@ -933,14 +952,14 @@ public void EmitClassMethodImplementations(AmqpClass c)

public void EmitMethodArgumentReader()
{
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlyMemory<byte> memory)");
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlySpan<byte> span)");
EmitLine(" {");
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Span);");
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Slice(2).Span);");
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(span);");
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(span.Slice(2));");
EmitLine(" Client.Impl.MethodBase result = DecodeMethodFrom(classId, methodId);");
EmitLine(" if(result != null)");
EmitLine(" {");
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(memory.Slice(4));");
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(span.Slice(4));");
EmitLine(" result.ReadArgumentsFrom(ref reader);");
EmitLine(" return result;");
EmitLine(" }");
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<MinVerVerbosity>minimal</MinVerVerbosity>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageOutputPath>..\..\packages</PackageOutputPath>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<PropertyGroup Condition="'$(CONCOURSE_CI_BUILD)' == 'true'">
Expand Down
8 changes: 5 additions & 3 deletions projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,18 @@ public Command HandleFrame(in InboundFrame f)
{
throw new UnexpectedFrameException(f.Type);
}
m_method = m_protocol.DecodeMethodFrom(f.Payload);
m_method = m_protocol.DecodeMethodFrom(f.Payload.Span);
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
return CompletedCommand();
case AssemblyState.ExpectingContentHeader:
if (!f.IsHeader())
{
throw new UnexpectedFrameException(f.Type);
}
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(f.Payload.Span));
ulong totalBodyBytes = m_header.ReadFrom(f.Payload.Slice(2));

ReadOnlySpan<byte> span = f.Payload.Span;
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
ulong totalBodyBytes = m_header.ReadFrom(span.Slice(2));
if (totalBodyBytes > MaxArrayOfBytesSize)
{
throw new UnexpectedFrameException(f.Type);
Expand Down
11 changes: 5 additions & 6 deletions projects/RabbitMQ.Client/client/impl/ContentHeaderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public virtual object Clone()
///<summary>
/// Fill this instance from the given byte buffer stream.
///</summary>
internal ulong ReadFrom(ReadOnlyMemory<byte> memory)
internal ulong ReadFrom(ReadOnlySpan<byte> span)
{
// Skipping the first two bytes since they arent used (weight - not currently used)
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(memory.Slice(2).Span);
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(memory.Slice(10));
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(span.Slice(2));
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(span.Slice(10));
ReadPropertiesFrom(ref reader);
return bodySize;
}
Expand All @@ -81,13 +81,12 @@ internal ulong ReadFrom(ReadOnlyMemory<byte> memory)

private const ushort ZERO = 0;

internal int WriteTo(Memory<byte> memory, ulong bodySize)
internal int WriteTo(Span<byte> span, ulong bodySize)
{
var span = memory.Span;
NetworkOrderSerializer.WriteUInt16(span, ZERO); // Weight - not used
NetworkOrderSerializer.WriteUInt64(span.Slice(2), bodySize);

ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(memory.Slice(10));
ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(span.Slice(10));
WritePropertiesTo(ref writer);
return 10 + writer.Offset;
}
Expand Down
77 changes: 39 additions & 38 deletions projects/RabbitMQ.Client/client/impl/ContentHeaderPropertyReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,28 @@

namespace RabbitMQ.Client.Impl
{
internal struct ContentHeaderPropertyReader
internal ref struct ContentHeaderPropertyReader
{
private ushort m_bitCount;
private ushort m_flagWord;
private int _memoryOffset;
private readonly ReadOnlyMemory<byte> _memory;
private const int StartBitMask = 0b1000_0000_0000_0000;
private const int EndBitMask = 0b0000_0000_0000_0001;

public ContentHeaderPropertyReader(ReadOnlyMemory<byte> memory)
{
_memory = memory;
_memoryOffset = 0;
m_flagWord = 1; // just the continuation bit
m_bitCount = 15; // the correct position to force a m_flagWord read
}
private readonly ReadOnlySpan<byte> _span;
private int _offset;
private int _bitMask;
private int _bits;

public bool ContinuationBitSet
private ReadOnlySpan<byte> Span => _span.Slice(_offset);

public ContentHeaderPropertyReader(ReadOnlySpan<byte> span)
{
get { return (m_flagWord & 1) != 0; }
_span = span;
_offset = 0;
_bitMask = EndBitMask; // force a flag read
_bits = 1; // just the continuation bit
}

private bool ContinuationBitSet => (_bits & EndBitMask) != 0;

public void FinishPresence()
{
if (ContinuationBitSet)
Expand All @@ -78,82 +80,81 @@ public bool ReadBit()
return ReadPresence();
}

public void ReadFlagWord()
private void ReadBits()
{
if (!ContinuationBitSet)
{
throw new MalformedFrameException("Attempted to read flag word when none advertised");
}
m_flagWord = NetworkOrderDeserializer.ReadUInt16(_memory.Slice(_memoryOffset).Span);
_memoryOffset += 2;
m_bitCount = 0;
_bits = NetworkOrderDeserializer.ReadUInt16(Span);
_offset += 2;
_bitMask = StartBitMask;
}

public uint ReadLong()
{
uint result = NetworkOrderDeserializer.ReadUInt32(_memory.Slice(_memoryOffset).Span);
_memoryOffset += 4;
uint result = NetworkOrderDeserializer.ReadUInt32(Span);
_offset += 4;
return result;
}

public ulong ReadLonglong()
{
ulong result = NetworkOrderDeserializer.ReadUInt64(_memory.Slice(_memoryOffset).Span);
_memoryOffset += 8;
ulong result = NetworkOrderDeserializer.ReadUInt64(Span);
_offset += 8;
return result;
}

public byte[] ReadLongstr()
{
byte[] result = WireFormatting.ReadLongstr(_memory.Slice(_memoryOffset));
_memoryOffset += 4 + result.Length;
byte[] result = WireFormatting.ReadLongstr(Span);
_offset += 4 + result.Length;
return result;
}

public byte ReadOctet()
{
return _memory.Span[_memoryOffset++];
return _span[_offset++];
}

public bool ReadPresence()
{
if (m_bitCount == 15)
if (_bitMask == EndBitMask)
{
ReadFlagWord();
ReadBits();
}

int bit = 15 - m_bitCount;
bool result = (m_flagWord & (1 << bit)) != 0;
m_bitCount++;
bool result = (_bits & _bitMask) != 0;
_bitMask >>= 1;
return result;
}

public ushort ReadShort()
{
ushort result = NetworkOrderDeserializer.ReadUInt16(_memory.Slice(_memoryOffset).Span);
_memoryOffset += 2;
ushort result = NetworkOrderDeserializer.ReadUInt16(Span);
_offset += 2;
return result;
}

public string ReadShortstr()
{
string result = WireFormatting.ReadShortstr(_memory.Slice(_memoryOffset), out int bytesRead);
_memoryOffset += bytesRead;
string result = WireFormatting.ReadShortstr(Span, out int bytesRead);
_offset += bytesRead;
return result;
}

/// <returns>A type of <seealso cref="System.Collections.Generic.IDictionary{TKey,TValue}"/>.</returns>
public Dictionary<string, object> ReadTable()
{
Dictionary<string, object> result = WireFormatting.ReadTable(_memory.Slice(_memoryOffset), out int bytesRead);
_memoryOffset += bytesRead;
Dictionary<string, object> result = WireFormatting.ReadTable(Span, out int bytesRead);
_offset += bytesRead;
return result;
}

public AmqpTimestamp ReadTimestamp()
{
AmqpTimestamp result = WireFormatting.ReadTimestamp(_memory.Slice(_memoryOffset));
_memoryOffset += 8;
AmqpTimestamp result = WireFormatting.ReadTimestamp(Span);
_offset += 8;
return result;
}
}
Expand Down
67 changes: 38 additions & 29 deletions projects/RabbitMQ.Client/client/impl/ContentHeaderPropertyWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,31 @@

namespace RabbitMQ.Client.Impl
{
struct ContentHeaderPropertyWriter
internal ref struct ContentHeaderPropertyWriter
{
private int _bitCount;
private ushort _flagWord;
public int Offset { get; private set; }
public Memory<byte> Memory { get; private set; }
private const ushort StartBitMask = 0b1000_0000_0000_0000;
private const ushort EndBitMask = 0b0000_0000_0000_0001;

public ContentHeaderPropertyWriter(Memory<byte> memory)
private readonly Span<byte> _span;
private int _offset;
private ushort _bitAccumulator;
private ushort _bitMask;

public int Offset => _offset;

private Span<byte> Span => _span.Slice(_offset);

public ContentHeaderPropertyWriter(Span<byte> span)
{
Memory = memory;
_flagWord = 0;
_bitCount = 0;
Offset = 0;
_span = span;
_offset = 0;
_bitAccumulator = 0;
_bitMask = StartBitMask;
}

public void FinishPresence()
{
EmitFlagWord(false);
WriteBits();
}

public void WriteBit(bool bit)
Expand All @@ -72,65 +79,67 @@ public void WriteBit(bool bit)

public void WriteLong(uint val)
{
Offset += WireFormatting.WriteLong(Memory.Slice(Offset), val);
_offset += WireFormatting.WriteLong(Span, val);
}

public void WriteLonglong(ulong val)
{
Offset += WireFormatting.WriteLonglong(Memory.Slice(Offset), val);
_offset += WireFormatting.WriteLonglong(Span, val);
}

public void WriteLongstr(byte[] val)
{
Offset += WireFormatting.WriteLongstr(Memory.Slice(Offset), val);
_offset += WireFormatting.WriteLongstr(Span, val);
}

public void WriteOctet(byte val)
{
Memory.Slice(Offset++).Span[0] = val;
_span[_offset++] = val;
}

public void WritePresence(bool present)
{
if (_bitCount == 15)
if (_bitMask == EndBitMask)
{
EmitFlagWord(true);
// Mark continuation
_bitAccumulator |= _bitMask;
WriteBits();
}

if (present)
{
int bit = 15 - _bitCount;
_flagWord = (ushort)(_flagWord | (1 << bit));
_bitAccumulator |= _bitMask;
}
_bitCount++;

_bitMask >>= 1;
}

public void WriteShort(ushort val)
{
Offset += WireFormatting.WriteShort(Memory.Slice(Offset), val);
_offset += WireFormatting.WriteShort(Span, val);
}

public void WriteShortstr(string val)
{
Offset += WireFormatting.WriteShortstr(Memory.Slice(Offset), val);
_offset += WireFormatting.WriteShortstr(Span, val);
}

public void WriteTable(IDictionary<string, object> val)
{
Offset += WireFormatting.WriteTable(Memory.Slice(Offset), val);
_offset += WireFormatting.WriteTable(Span, val);
}

public void WriteTimestamp(AmqpTimestamp val)
{
Offset += WireFormatting.WriteTimestamp(Memory.Slice(Offset), val);
_offset += WireFormatting.WriteTimestamp(Span, val);
}

private void EmitFlagWord(bool continuationBit)
private void WriteBits()
{
NetworkOrderSerializer.WriteUInt16(Memory.Slice(Offset).Span, (ushort)(continuationBit ? (_flagWord | 1) : _flagWord));
Offset += 2;
_flagWord = 0;
_bitCount = 0;
NetworkOrderSerializer.WriteUInt16(Span, _bitAccumulator);
_offset += 2;
_bitMask = StartBitMask;
_bitAccumulator = 0;
}
}
}
Loading

0 comments on commit e593e36

Please sign in to comment.