Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8.0: implement BasicPublishMemory #990

Merged
merged 1 commit into from
Dec 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions projects/Benchmarks/WireFormatting/MethodSerialization.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System;

using System.Text;
using BenchmarkDotNet.Attributes;

using RabbitMQ.Client.Framing;
Expand Down Expand Up @@ -31,17 +31,33 @@ public class MethodBasicAck : MethodSerializationBase

public class MethodBasicDeliver : MethodSerializationBase
{
private readonly BasicDeliver _basicDeliver = new BasicDeliver(string.Empty, 0, false, string.Empty, string.Empty);
public override void SetUp() => _basicDeliver.WriteArgumentsTo(_buffer.Span);
private const string StringValue = "Exchange_OR_RoutingKey";
private readonly BasicPublish _basicPublish = new BasicPublish(default, StringValue, StringValue, false, false);
private readonly BasicPublishMemory _basicPublishMemory = new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false);

public override void SetUp()
{
int offset = RabbitMQ.Client.Impl.WireFormatting.WriteShortstr(_buffer.Span, string.Empty);
offset += RabbitMQ.Client.Impl.WireFormatting.WriteLonglong(_buffer.Slice(offset).Span, 0);
offset += RabbitMQ.Client.Impl.WireFormatting.WriteBits(_buffer.Slice(offset).Span, false);
offset += RabbitMQ.Client.Impl.WireFormatting.WriteShortstr(_buffer.Slice(offset).Span, string.Empty);
RabbitMQ.Client.Impl.WireFormatting.WriteShortstr(_buffer.Slice(offset).Span, string.Empty);
}

[Benchmark]
public object BasicDeliverRead() => new BasicDeliver(_buffer.Span);

[Benchmark]
public int BasicDeliverWrite() => _basicDeliver.WriteArgumentsTo(_buffer.Span);
public int BasicPublishWrite() => _basicPublish.WriteArgumentsTo(_buffer.Span);

[Benchmark]
public int BasicPublishMemoryWrite() => _basicPublishMemory.WriteArgumentsTo(_buffer.Span);

[Benchmark]
public int BasicPublishSize() => _basicPublish.GetRequiredBufferSize();

[Benchmark]
public int BasicDeliverSize() => _basicDeliver.GetRequiredBufferSize();
public int BasicPublishMemorySize() => _basicPublishMemory.GetRequiredBufferSize();
}

public class MethodChannelClose : MethodSerializationBase
Expand Down
63 changes: 63 additions & 0 deletions projects/RabbitMQ.Client/client/api/CachedString.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Text;

namespace RabbitMQ.Client
{
/// <summary>
/// Caches a string's byte representation to be used for certain methods like <see cref="IModel.BasicPublish(CachedString,CachedString,bool,IBasicProperties,ReadOnlyMemory{byte})"/>.
/// </summary>
public sealed class CachedString
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bollhals what is the benefit of exposing this as method parameters vs exposing the CLR type ReadonlyMemory? At first glance it looks like only convenience right? So would it make sense to make the public Api just accepting the CLR type instead? I'm asking because even with cached string the caller of the API needs to be concerned with caching the instances so the value of this class is really only abstracting the encoding get bytes which I find not enough to expose a custom type. A middle ground could be to expose the type but make it convertible to readonly memory while still switching the public Api to readonly memory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope my explanation is reasonable and what I wrote is understandable 😄
This was one thing which I was not 100% certain what the best option might be. In the end I've exposed it as CachedString due to:

  • You should only use this method if you actually cache the string (Otherwise you have worse performance)
  • Exposing ROM enables customer to send non UTF8 values, which might cause more harm than good. (In which case would you have a valid ROM but not a string? (Stackalloc is also not possible since it's Memory not Span))
  • In the future (comment from other PR) we could start returning the CachedString in other methods like BasicConsume or QueueDeclare, which then again could directly be used again in follow up methods for better performance.
  • CachedString ist more clearly explaining the intent of the method than ROM + comments explaining it has to be a UTF8 sting

Let's switch it around, what impact would you get from exposing just ROM?
Which use cases could you see which taking a ROM is preferable than CachedString?

The intent of this API is better performance due to not having to encode the same string over and over again which is done by caching the encoded value, which implies that the user is caching the value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok got it. These are really the strongest reason to expose a custom type

  • You should only use this method if you actually cache the string (Otherwise you have worse performance)
  • Exposing ROM enables customer to send non UTF8 values, which might cause more harm than good. (In which case would you have a valid ROM but not a string? (Stackalloc is also not possible since it's Memory not Span))

Which use cases could you see which taking a ROM is preferable than CachedString?

My thoughts were comming from the experience that when you have a .NET type available to expose a .NET type where it makes sense because it is one API less you have to maintain and potentially evolve. But like mentioned above the two major bulllets seem to indicate a strong custom type makes sense.

Regarding

  • CachedString ist more clearly explaining the intent of the method than ROM + comments explaining it has to be a UTF8 string

Could we add some validation into the constructor of that type or at least call it CachedUTF8String or something similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CachedString ist more clearly explaining the intent of the method than ROM + comments explaining it has to be a UTF8 string

Could we add some validation into the constructor of that type or at least call it CachedUTF8String or something similar?

What kind of validation?
Sure we can rename it, but I'm not favoring UTF8 in the name as the type kind of abstracting it away so you don't need to care what it is. Adding it in the name makes it clear what it will use, but for what purpose?

{
public static readonly CachedString Empty = new CachedString(string.Empty, ReadOnlyMemory<byte>.Empty);

/// <summary>
/// The string value to cache.
/// </summary>
public readonly string Value;
/// <summary>
/// Gets the bytes representing the <see cref="Value"/>.
/// </summary>
public readonly ReadOnlyMemory<byte> Bytes;

/// <summary>
/// Creates a new <see cref="CachedString"/> based on the provided string.
/// </summary>
/// <param name="value">The string to cache.</param>
public CachedString(string value)
{
Value = value;
Bytes = Encoding.UTF8.GetBytes(value);
}

/// <summary>
/// Creates a new <see cref="CachedString"/> based on the provided bytes.
/// </summary>
/// <param name="bytes">The bytes.</param>
public CachedString(ReadOnlyMemory<byte> bytes)
{
#if !NETSTANDARD
Value = Encoding.UTF8.GetString(bytes.Span);
#else
unsafe
{
fixed (byte* bytePointer = bytes.Span)
{
Value = Encoding.UTF8.GetString(bytePointer, bytes.Length);
}
}
#endif
Bytes = bytes;
}

/// <summary>
/// Creates a new <see cref="CachedString"/> based on the provided values.
/// </summary>
/// <param name="value">The string to cache.</param>
/// <param name="bytes">The byte representation of the string value.</param>
public CachedString(string value, ReadOnlyMemory<byte> bytes)
{
Value = value;
Bytes = bytes;
}
}
}
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/client/api/IBasicPublishBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace RabbitMQ.Client
public interface IBasicPublishBatch
{
void Add(string exchange, string routingKey, bool mandatory, IBasicProperties properties, ReadOnlyMemory<byte> body);
void Add(CachedString exchange, CachedString routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body);
void Publish();
}
}
9 changes: 9 additions & 0 deletions projects/RabbitMQ.Client/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ string BasicConsume(
/// </para>
/// </remarks>
void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body);
/// <summary>
/// Publishes a message.
/// </summary>
/// <remarks>
/// <para>
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
void BasicPublish(CachedString exchange, CachedString routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body);

/// <summary>
/// Configures QoS parameters of the Basic content-class.
Expand Down
11 changes: 11 additions & 0 deletions projects/RabbitMQ.Client/client/api/IModelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ public static void BasicPublish(this IModel model, PublicationAddress addr, IBas
model.BasicPublish(addr.ExchangeName, addr.RoutingKey, basicProperties: basicProperties, body: body);
}

/// <summary>
/// (Extension method) Convenience overload of BasicPublish.
/// </summary>
/// <remarks>
/// The publication occurs with mandatory=false
/// </remarks>
public static void BasicPublish(this IModel model, CachedString exchange, CachedString routingKey, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
model.BasicPublish(exchange, routingKey, false, basicProperties, body);
}

/// <summary>
/// (Extension method) Convenience overload of BasicPublish.
/// </summary>
Expand Down
38 changes: 37 additions & 1 deletion projects/RabbitMQ.Client/client/framing/BasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
//---------------------------------------------------------------------------

using System;
using System.Text;
using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Impl;

Expand Down Expand Up @@ -85,4 +84,41 @@ public override int GetRequiredBufferSize()
return bufferSize;
}
}

internal sealed class BasicPublishMemory : Client.Impl.MethodBase
{
/* unused, therefore commented out
* public readonly ushort _reserved1;
*/
public readonly ReadOnlyMemory<byte> _exchange;
public readonly ReadOnlyMemory<byte> _routingKey;
public readonly bool _mandatory;
public readonly bool _immediate;

public BasicPublishMemory(ReadOnlyMemory<byte> Exchange, ReadOnlyMemory<byte> RoutingKey, bool Mandatory, bool Immediate)
{
_exchange = Exchange;
_routingKey = RoutingKey;
_mandatory = Mandatory;
_immediate = Immediate;
}

public override ProtocolCommandId ProtocolCommandId => ProtocolCommandId.BasicPublish;
public override string ProtocolMethodName => "basic.publish";
public override bool HasContent => true;

public override int WriteArgumentsTo(Span<byte> span)
{
int offset = WireFormatting.WriteShort(span, 0);
offset += WireFormatting.WriteShortstr(span.Slice(offset), _exchange.Span);
offset += WireFormatting.WriteShortstr(span.Slice(offset), _routingKey.Span);
return offset + WireFormatting.WriteBits(span.Slice(offset), _mandatory, _immediate);
}

public override int GetRequiredBufferSize()
{
return 2 + 1 + 1 + 1 + // bytes for _reserved1, length of _exchange, length of _routingKey, bit fields
_exchange.Length + _routingKey.Length;
}
}
}
5 changes: 5 additions & 0 deletions projects/RabbitMQ.Client/client/framing/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public override void _Private_BasicPublish(string exchange, string routingKey, b
ModelSend(new BasicPublish(default, exchange, routingKey, mandatory, default), (BasicProperties) basicProperties, body);
}

public override void _Private_BasicPublishMemory(ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
ModelSend(new BasicPublishMemory(exchange, routingKey, mandatory, default), (BasicProperties) basicProperties, body);
}

public override void _Private_BasicRecover(bool requeue)
{
ModelSend(new BasicRecover(requeue));
Expand Down
3 changes: 3 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,9 @@ public void BasicPublish(string exchange,
body);
}

public void BasicPublish(CachedString exchange, CachedString routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
=> Delegate.BasicPublish(exchange, routingKey, mandatory, basicProperties, body);

public void BasicQos(uint prefetchSize,
ushort prefetchCount,
bool global)
Expand Down
6 changes: 5 additions & 1 deletion projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

using System;
using System.Collections.Generic;

using RabbitMQ.Client.Framing.Impl;

namespace RabbitMQ.Client.Impl
Expand Down Expand Up @@ -61,7 +60,12 @@ public void Add(string exchange, string routingKey, bool mandatory, IBasicProper
_routingKey = routingKey,
_mandatory = mandatory
};
_commands.Add(new OutgoingCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
}

public void Add(CachedString exchange, CachedString routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
var method = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
_commands.Add(new OutgoingCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
}

Expand Down
39 changes: 33 additions & 6 deletions projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,12 @@ public abstract void _Private_BasicPublish(string exchange,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body);

public abstract void _Private_BasicPublishMemory(ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
bool mandatory,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body);

public abstract void _Private_BasicRecover(bool requeue);

public abstract void _Private_ChannelClose(ushort replyCode,
Expand Down Expand Up @@ -1132,11 +1138,6 @@ public void BasicPublish(string exchange,
throw new ArgumentNullException(nameof(routingKey));
}

if (basicProperties is null)
{
basicProperties = _emptyBasicProperties;
}

if (NextPublishSeqNo > 0)
{
lock (_confirmLock)
Expand All @@ -1157,10 +1158,36 @@ public void BasicPublish(string exchange,
_Private_BasicPublish(exchange,
routingKey,
mandatory,
basicProperties,
basicProperties ?? _emptyBasicProperties,
body);
}

public void BasicPublish(CachedString exchange,
CachedString routingKey,
bool mandatory,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body)
{
if (NextPublishSeqNo > 0)
{
lock (_confirmLock)
{
if (_deliveryTagsCountdown.IsSet)
{
_deliveryTagsCountdown.Reset(1);
}
else
{
_deliveryTagsCountdown.AddCount();
}

_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
}
}

_Private_BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, basicProperties ?? _emptyBasicProperties, body);
}

public void UpdateSecret(string newSecret, string reason)
{
if (newSecret is null)
Expand Down
17 changes: 17 additions & 0 deletions projects/RabbitMQ.Client/client/impl/WireFormatting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,20 @@ public static int WriteShort(Span<byte> span, ushort val)
return 2;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int WriteShortstr(Span<byte> span, ReadOnlySpan<byte> value)
{
var length = value.Length;
if (length <= byte.MaxValue)
{
span[0] = (byte)length;
value.CopyTo(span.Slice(1));
return length + 1;
}

return ThrowArgumentTooLong(length);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int WriteShortstr(Span<byte> span, string val)
{
Expand Down Expand Up @@ -953,6 +967,9 @@ public static int WriteTimestamp(Span<byte> span, AmqpTimestamp val)
return WriteLonglong(span, (ulong)val.UnixTime);
}

public static int ThrowArgumentTooLong(int length)
=> throw new ArgumentOutOfRangeException("value", $"Value exceeds the maximum allowed length of 255 bytes, was {length} long.");

public static int ThrowArgumentOutOfRangeException(int orig, int expected)
=> throw new ArgumentOutOfRangeException("span", $"Span has not enough space ({orig} instead of {expected})");

Expand Down
31 changes: 31 additions & 0 deletions projects/Unit/TestBasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,37 @@ public void TestBasicRoundtripArray()
}
}

[Test]
public void TestBasicRoundtripCachedString()
{
var cf = new ConnectionFactory();
using(IConnection c = cf.CreateConnection())
using(IModel m = c.CreateModel())
{
CachedString exchangeName = new CachedString(string.Empty);
CachedString queueName = new CachedString(m.QueueDeclare().QueueName);
IBasicProperties bp = m.CreateBasicProperties();
byte[] sendBody = System.Text.Encoding.UTF8.GetBytes("hi");
byte[] consumeBody = null;
var consumer = new EventingBasicConsumer(m);
var are = new AutoResetEvent(false);
consumer.Received += async (o, a) =>
{
consumeBody = a.Body.ToArray();
are.Set();
await Task.Yield();
};
string tag = m.BasicConsume(queueName.Value, true, consumer);

m.BasicPublish(exchangeName, queueName, bp, sendBody);
bool waitResFalse = are.WaitOne(2000);
m.BasicCancel(tag);

Assert.IsTrue(waitResFalse);
CollectionAssert.AreEqual(sendBody, consumeBody);
}
}

[Test]
public void TestBasicRoundtripReadOnlyMemory()
{
Expand Down
2 changes: 1 addition & 1 deletion projects/Unit/TestFloodPublishing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void TestUnthrottledFloodPublishing()
}
}

model.BasicPublish("", "", null, _body);
model.BasicPublish(CachedString.Empty, CachedString.Empty, null, _body);
}
}
finally
Expand Down
Loading