Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bytes to represent AMQP property map #39307

Merged
merged 5 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 145 additions & 145 deletions sdk/core/Azure.Core.Amqp/src/Shared/AmqpAnnotatedMessageConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public static AmqpAnnotatedMessage FromAmqpMessage(AmqpMessage source)

if (source.Properties.CorrelationId != null)
{
message.Properties.CorrelationId = new AmqpMessageId(source.Properties.CorrelationId.ToString());
message.Properties.CorrelationId = new AmqpMessageId(source.Properties.CorrelationId.ToString()!);
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
}

if (source.Properties.CreationTime.HasValue)
Expand All @@ -349,12 +349,12 @@ public static AmqpAnnotatedMessage FromAmqpMessage(AmqpMessage source)

if (source.Properties.MessageId != null)
{
message.Properties.MessageId = new AmqpMessageId(source.Properties.MessageId.ToString());
message.Properties.MessageId = new AmqpMessageId(source.Properties.MessageId.ToString()!);
}

if (source.Properties.ReplyTo != null)
{
message.Properties.ReplyTo = new AmqpAddress(source.Properties.ReplyTo.ToString());
message.Properties.ReplyTo = new AmqpAddress(source.Properties.ReplyTo.ToString()!);
}

if (!string.IsNullOrEmpty(source.Properties.ReplyToGroupId))
Expand All @@ -369,10 +369,10 @@ public static AmqpAnnotatedMessage FromAmqpMessage(AmqpMessage source)

if (source.Properties.To != null)
{
message.Properties.To = new AmqpAddress(source.Properties.To.ToString());
message.Properties.To = new AmqpAddress(source.Properties.To.ToString()!);
}

if (source.Properties.UserId != null)
if (source.Properties.UserId != default)
{
message.Properties.UserId = source.Properties.UserId;
}
Expand Down Expand Up @@ -433,142 +433,6 @@ public static AmqpAnnotatedMessage FromAmqpMessage(AmqpMessage source)
return message;
}

/// <summary>
/// Translates the data body segments into the corresponding set of
/// <see cref="Data" /> instances.
/// </summary>
///
/// <param name="dataBody">The data body to translate.</param>
///
/// <returns>The set of <see cref="Data" /> instances that represents the <paramref name="dataBody" />.</returns>
///
private static IEnumerable<Data> TranslateDataBody(IEnumerable<ReadOnlyMemory<byte>> dataBody)
{
foreach (var bodySegment in dataBody)
{
if (!MemoryMarshal.TryGetArray(bodySegment, out ArraySegment<byte> dataSegment))
{
dataSegment = new ArraySegment<byte>(bodySegment.ToArray());
}

yield return new Data
{
Value = dataSegment
};
}
}

/// <summary>
/// Translates the data body elements into the corresponding set of
/// <see cref="AmqpSequence" /> instances.
/// </summary>
///
/// <param name="sequenceBody">The sequence body to translate.</param>
///
/// <returns>The set of <see cref="AmqpSequence" /> instances that represents the <paramref name="sequenceBody" /> in AMQP format.</returns>
///
private static IEnumerable<AmqpSequence> TranslateSequenceBody(IEnumerable<IList<object>> sequenceBody)
{
foreach (var item in sequenceBody)
{
yield return new AmqpSequence((IList)item);
}
}

/// <summary>
/// Translates the data body into the corresponding set of
/// <see cref="AmqpValue" /> instance.
/// </summary>
///
/// <param name="valueBody">The sequence body to translate.</param>
///
/// <returns>The <see cref="AmqpValue" /> instance that represents the <paramref name="valueBody" /> in AMQP format.</returns>
///
private static AmqpValue TranslateValueBody(object valueBody)
{
if (TryCreateAmqpPropertyValueFromNetProperty(valueBody, out var amqpValue, allowBodyTypes: true))
{
return new AmqpValue { Value = amqpValue };
}

throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, "{0} is not a supported value body type.", valueBody.GetType().Name));
}

/// <summary>
/// Attempts to read the data body of an <see cref="AmqpMessage" />.
/// </summary>
///
/// <param name="source">The <see cref="AmqpMessage" /> to read from.</param>
/// <param name="dataBody">The value of the data body, if read.</param>
///
/// <returns><c>true</c> if the body was successfully read; otherwise, <c>false</c>.</returns>
///
private static bool TryGetDataBody(AmqpMessage source, out AmqpMessageBody? dataBody)
{
if (((source.BodyType & SectionFlag.Data) == 0) || (source.DataBody == null))
{
dataBody = null;
return false;
}

dataBody = AmqpMessageBody.FromData(MessageBody.FromDataSegments(source.DataBody));
return true;
}

/// <summary>
/// Attempts to read the sequence body of an <see cref="AmqpMessage" />.
/// </summary>
///
/// <param name="source">The <see cref="AmqpMessage" /> to read from.</param>
/// <param name="sequenceBody">The value of the sequence body, if read.</param>
///
/// <returns><c>true</c> if the body was successfully read; otherwise, <c>false</c>.</returns>
///
private static bool TryGetSequenceBody(AmqpMessage source, out AmqpMessageBody? sequenceBody)
{
if ((source.BodyType & SectionFlag.AmqpSequence) == 0)
{
sequenceBody = null;
return false;
}

var bodyContent = new List<IList<object>>();

foreach (var item in source.SequenceBody)
{
bodyContent.Add((IList<object>)item.List);
}

sequenceBody = AmqpMessageBody.FromSequence(bodyContent);
return true;
}

/// <summary>
/// Attempts to read the sequence body of an <see cref="AmqpMessage" />.
/// </summary>
///
/// <param name="source">The <see cref="AmqpMessage" /> to read from.</param>
/// <param name="valueBody">The value body, if read.</param>
///
/// <returns><c>true</c> if the body was successfully read; otherwise, <c>false</c>.</returns>
///
private static bool TryGetValueBody(AmqpMessage source, out AmqpMessageBody? valueBody)
{
if (((source.BodyType & SectionFlag.AmqpValue) == 0) || (source.ValueBody?.Value == null))
{
valueBody = null;
return false;
}

if (TryCreateNetPropertyFromAmqpProperty(source.ValueBody.Value, out var translatedValue, allowBodyTypes: true))
{
valueBody = AmqpMessageBody.FromValue(translatedValue!);
return true;
}

throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, "{0} is not a supported value body type.", source.ValueBody.Value.GetType().Name));
}

/// <summary>
/// Attempts to create an AMQP property value for a given event property.
/// </summary>
Expand All @@ -579,7 +443,7 @@ private static bool TryGetValueBody(AmqpMessage source, out AmqpMessageBody? val
///
/// <returns><c>true</c> if an AMQP property value was able to be created; otherwise, <c>false</c>.</returns>
///
private static bool TryCreateAmqpPropertyValueFromNetProperty(
public static bool TryCreateAmqpPropertyValueFromNetProperty(
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
object? propertyValue,
out object? amqpPropertyValue,
bool allowBodyTypes = false)
Expand Down Expand Up @@ -659,7 +523,7 @@ private static bool TryCreateAmqpPropertyValueFromNetProperty(
///
/// <returns><c>true</c> if a message property value was able to be created; otherwise, <c>false</c>.</returns>
///
private static bool TryCreateNetPropertyFromAmqpProperty(
public static bool TryCreateNetPropertyFromAmqpProperty(
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
object? amqpPropertyValue,
out object? convertedPropertyValue,
bool allowBodyTypes = false)
Expand Down Expand Up @@ -715,13 +579,13 @@ private static bool TryCreateNetPropertyFromAmqpProperty(
convertedPropertyValue = listOrArray;
break;

case ArraySegment<byte> segment when segment.Count == segment.Array.Length:
case ArraySegment<byte> segment when segment.Count == segment.Array!.Length:
convertedPropertyValue = segment.Array;
break;

case ArraySegment<byte> segment:
var buffer = new byte[segment.Count];
Buffer.BlockCopy(segment.Array, segment.Offset, buffer, 0, segment.Count);
Buffer.BlockCopy(segment.Array!, segment.Offset, buffer, 0, segment.Count);
convertedPropertyValue = buffer;
break;

Expand Down Expand Up @@ -750,6 +614,142 @@ private static bool TryCreateNetPropertyFromAmqpProperty(
return (convertedPropertyValue != null);
}

/// <summary>
/// Translates the data body segments into the corresponding set of
/// <see cref="Data" /> instances.
/// </summary>
///
/// <param name="dataBody">The data body to translate.</param>
///
/// <returns>The set of <see cref="Data" /> instances that represents the <paramref name="dataBody" />.</returns>
///
private static IEnumerable<Data> TranslateDataBody(IEnumerable<ReadOnlyMemory<byte>> dataBody)
{
foreach (var bodySegment in dataBody)
{
if (!MemoryMarshal.TryGetArray(bodySegment, out ArraySegment<byte> dataSegment))
{
dataSegment = new ArraySegment<byte>(bodySegment.ToArray());
}

yield return new Data
{
Value = dataSegment
};
}
}

/// <summary>
/// Translates the data body elements into the corresponding set of
/// <see cref="AmqpSequence" /> instances.
/// </summary>
///
/// <param name="sequenceBody">The sequence body to translate.</param>
///
/// <returns>The set of <see cref="AmqpSequence" /> instances that represents the <paramref name="sequenceBody" /> in AMQP format.</returns>
///
private static IEnumerable<AmqpSequence> TranslateSequenceBody(IEnumerable<IList<object>> sequenceBody)
{
foreach (var item in sequenceBody)
{
yield return new AmqpSequence((IList)item);
}
}

/// <summary>
/// Translates the data body into the corresponding set of
/// <see cref="AmqpValue" /> instance.
/// </summary>
///
/// <param name="valueBody">The sequence body to translate.</param>
///
/// <returns>The <see cref="AmqpValue" /> instance that represents the <paramref name="valueBody" /> in AMQP format.</returns>
///
private static AmqpValue TranslateValueBody(object valueBody)
{
if (TryCreateAmqpPropertyValueFromNetProperty(valueBody, out var amqpValue, allowBodyTypes: true))
{
return new AmqpValue { Value = amqpValue };
}

throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, "{0} is not a supported value body type.", valueBody.GetType().Name));
}

/// <summary>
/// Attempts to read the data body of an <see cref="AmqpMessage" />.
/// </summary>
///
/// <param name="source">The <see cref="AmqpMessage" /> to read from.</param>
/// <param name="dataBody">The value of the data body, if read.</param>
///
/// <returns><c>true</c> if the body was successfully read; otherwise, <c>false</c>.</returns>
///
private static bool TryGetDataBody(AmqpMessage source, out AmqpMessageBody? dataBody)
{
if (((source.BodyType & SectionFlag.Data) == 0) || (source.DataBody == null))
{
dataBody = null;
return false;
}

dataBody = AmqpMessageBody.FromData(MessageBody.FromDataSegments(source.DataBody));
return true;
}

/// <summary>
/// Attempts to read the sequence body of an <see cref="AmqpMessage" />.
/// </summary>
///
/// <param name="source">The <see cref="AmqpMessage" /> to read from.</param>
/// <param name="sequenceBody">The value of the sequence body, if read.</param>
///
/// <returns><c>true</c> if the body was successfully read; otherwise, <c>false</c>.</returns>
///
private static bool TryGetSequenceBody(AmqpMessage source, out AmqpMessageBody? sequenceBody)
{
if ((source.BodyType & SectionFlag.AmqpSequence) == 0)
{
sequenceBody = null;
return false;
}

var bodyContent = new List<IList<object>>();

foreach (var item in source.SequenceBody)
{
bodyContent.Add((IList<object>)item.List);
}

sequenceBody = AmqpMessageBody.FromSequence(bodyContent);
return true;
}

/// <summary>
/// Attempts to read the sequence body of an <see cref="AmqpMessage" />.
/// </summary>
///
/// <param name="source">The <see cref="AmqpMessage" /> to read from.</param>
/// <param name="valueBody">The value body, if read.</param>
///
/// <returns><c>true</c> if the body was successfully read; otherwise, <c>false</c>.</returns>
///
private static bool TryGetValueBody(AmqpMessage source, out AmqpMessageBody? valueBody)
{
if (((source.BodyType & SectionFlag.AmqpValue) == 0) || (source.ValueBody?.Value == null))
{
valueBody = null;
return false;
}

if (TryCreateNetPropertyFromAmqpProperty(source.ValueBody.Value, out var translatedValue, allowBodyTypes: true))
{
valueBody = AmqpMessageBody.FromValue(translatedValue!);
return true;
}

throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, "{0} is not a supported value body type.", source.ValueBody.Value.GetType().Name));
}

private static void ThrowSerializationFailed(string propertyName, KeyValuePair<string, object?> pair)
{
throw new NotSupportedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc
{
internal static class SettlementExtensions
{
internal static object GetPropertyValue(this SettlementProperties properties)
{
return properties.ValuesCase switch
{
SettlementProperties.ValuesOneofCase.LongValue => properties.LongValue,
SettlementProperties.ValuesOneofCase.UlongValue => properties.UlongValue,
SettlementProperties.ValuesOneofCase.DoubleValue => properties.DoubleValue,
SettlementProperties.ValuesOneofCase.FloatValue => properties.FloatValue,
SettlementProperties.ValuesOneofCase.IntValue => properties.IntValue,
SettlementProperties.ValuesOneofCase.UintValue => properties.UintValue,
SettlementProperties.ValuesOneofCase.BoolValue => properties.BoolValue,
SettlementProperties.ValuesOneofCase.StringValue => properties.StringValue,
_ => null
};
}
}
}
#endif
Loading