Skip to content

Commit

Permalink
Code CleanUps in Primitives / General (#350)
Browse files Browse the repository at this point in the history
* EventHubCode Clean ups

* Code CleanUps in Primitives

* Rename variable

* Leave string cast
  • Loading branch information
David Revoledo authored and serkantkaraca committed Nov 28, 2018
1 parent c5a9f05 commit 8aae6b6
Show file tree
Hide file tree
Showing 16 changed files with 85 additions and 209 deletions.
45 changes: 13 additions & 32 deletions src/Microsoft.Azure.EventHubs/EventData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,18 @@ public EventData(ArraySegment<byte> arraySegment)
/// Get the actual Payload/Data wrapped by EventData.
/// This is intended to be used after receiving EventData using <see cref="PartitionReceiver"/>.
/// </summary>
public ArraySegment<byte> Body
{
get;
}
public ArraySegment<byte> Body { get; }

/// <summary>
/// Application property bag
/// </summary>
public IDictionary<string, object> Properties
{
get; internal set;
}
public IDictionary<string, object> Properties { get; internal set; }

/// <summary>
/// SystemProperties that are populated by EventHubService.
/// As these are populated by Service, they are only present on a Received EventData.
/// </summary>
public SystemPropertiesCollection SystemProperties
{
get; internal set;
}
public SystemPropertiesCollection SystemProperties { get; internal set; }

internal AmqpMessage AmqpMessage { get; set; }

Expand All @@ -104,10 +95,7 @@ void Dispose(bool disposing)
{
if (disposing)
{
if (this.AmqpMessage != null)
{
this.AmqpMessage.Dispose();
}
AmqpMessage?.Dispose();
}

disposed = true;
Expand All @@ -133,10 +121,8 @@ public long SequenceNumber
{
return (long)value;
}
else
{
throw new ArgumentException(Resources.MissingSystemProperty.FormatForUser(ClientConstants.SequenceNumberName));
}

throw new ArgumentException(Resources.MissingSystemProperty.FormatForUser(ClientConstants.SequenceNumberName));
}
}

Expand All @@ -151,10 +137,8 @@ public DateTime EnqueuedTimeUtc
{
return (DateTime)value;
}
else
{
throw new ArgumentException(Resources.MissingSystemProperty.FormatForUser(ClientConstants.EnqueuedTimeUtcName));
}

throw new ArgumentException(Resources.MissingSystemProperty.FormatForUser(ClientConstants.EnqueuedTimeUtcName));
}
}

Expand All @@ -170,10 +154,8 @@ public string Offset
{
return (string)value;
}
else
{
throw new ArgumentException(Resources.MissingSystemProperty.FormatForUser(ClientConstants.OffsetName));
}

throw new ArgumentException(Resources.MissingSystemProperty.FormatForUser(ClientConstants.OffsetName));
}
}

Expand All @@ -187,12 +169,11 @@ public string PartitionKey
{
return (string)value;
}
else
{
return null;
}

return null;
}
}
}
}
}

18 changes: 5 additions & 13 deletions src/Microsoft.Azure.EventHubs/EventDataBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ public bool TryAdd(EventData eventData)
return true;
}

internal string PartitionKey
{
get; set;
}
internal string PartitionKey { get; set; }

long GetEventSizeForBatch(EventData eventData)
{
Expand All @@ -89,15 +86,10 @@ long GetEventSizeForBatch(EventData eventData)
eventData.AmqpMessage = amqpMessage;

// Calculate overhead depending on the message size.
if (eventData.AmqpMessage.SerializedMessageSize < 256)
{
// Overhead is smaller for messages smaller than 256 bytes.
return eventData.AmqpMessage.SerializedMessageSize + 5;
}
else
{
return eventData.AmqpMessage.SerializedMessageSize + 8;
}
// Overhead is smaller for messages smaller than 256 bytes.
long overhead = eventData.AmqpMessage.SerializedMessageSize < 256 ? 5 : 8;

return eventData.AmqpMessage.SerializedMessageSize + overhead;
}

/// <summary>
Expand Down
6 changes: 1 addition & 5 deletions src/Microsoft.Azure.EventHubs/EventDataSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ async Task<IEnumerable<EventData>> ProcessEvents(IEnumerable<EventData> eventDat
return processedEventList;
}

internal long MaxMessageSize
{
get;
set;
}
internal long MaxMessageSize { get; set; }
}
}
31 changes: 14 additions & 17 deletions src/Microsoft.Azure.EventHubs/EventHubsDiagnosticSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ internal static Activity StartSendActivity(string clientId, EventHubsConnectionS
DiagnosticListener.StartActivity(activity,
new
{
Endpoint = csb.Endpoint,
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
EventDatas = eventDatas
Expand All @@ -87,7 +87,7 @@ internal static void FailSendActivity(Activity activity, EventHubsConnectionStri
DiagnosticListener.Write(SendActivityExceptionName,
new
{
Endpoint = csb.Endpoint,
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
EventDatas = eventDatas,
Expand All @@ -105,19 +105,19 @@ internal static void StopSendActivity(Activity activity, EventHubsConnectionStri
DiagnosticListener.StopActivity(activity,
new
{
Endpoint = csb.Endpoint,
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
EventDatas = eventDatas,
Status = sendTask?.Status
sendTask?.Status
});
}

internal static Activity StartReceiveActivity(
string clientId,
EventHubsConnectionStringBuilder csb,
string partitionKey,
string consumerGroup,
string clientId,
EventHubsConnectionStringBuilder csb,
string partitionKey,
string consumerGroup,
EventPosition eventPosition)
{
// skip if diagnostic source not enabled
Expand Down Expand Up @@ -168,7 +168,6 @@ internal static Activity StartReceiveActivity(
internal static void FailReceiveActivity(Activity activity, EventHubsConnectionStringBuilder csb, string partitionKey, string consumerGroup, Exception ex)
{
// TODO consider enriching activity with data from exception

if (!DiagnosticListener.IsEnabled() || !DiagnosticListener.IsEnabled(ReceiveActivityExceptionName))
{
return;
Expand All @@ -177,7 +176,7 @@ internal static void FailReceiveActivity(Activity activity, EventHubsConnectionS
DiagnosticListener.Write(ReceiveActivityExceptionName,
new
{
Endpoint = csb.Endpoint,
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
ConsumerGroup = consumerGroup,
Expand All @@ -198,12 +197,12 @@ internal static void StopReceiveActivity(Activity activity, EventHubsConnectionS
DiagnosticListener.StopActivity(activity,
new
{
Endpoint = csb.Endpoint,
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
ConsumerGroup = consumerGroup,
EventDatas = events,
Status = receiveTask?.Status
receiveTask?.Status
});
}

Expand Down Expand Up @@ -237,11 +236,9 @@ private static void Inject(EventData eventData, string id, string correlationCon

internal static string SerializeCorrelationContext(IList<KeyValuePair<string, string>> baggage)
{
if (baggage.Any())
{
return string.Join(",", baggage.Select(kvp => kvp.Key + "=" + kvp.Value));
}
return null;
return baggage.Any()
? string.Join(",", baggage.Select(kvp => kvp.Key + "=" + kvp.Value))
: null;
}

private static void SetRelatedOperations(Activity activity, IEnumerable<EventData> eventDatas)
Expand Down
9 changes: 3 additions & 6 deletions src/Microsoft.Azure.EventHubs/EventPosition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static EventPosition FromEnd()
{
return EventPosition.FromOffset(EndOfStream);
}

/// <summary>
/// Creates a position at the given offset.
/// </summary>
Expand All @@ -50,6 +50,7 @@ public static EventPosition FromEnd()
public static EventPosition FromOffset(string offset, bool inclusive = false)
{
Guard.ArgumentNotNullOrWhiteSpace(nameof(offset), offset);

return new EventPosition { Offset = offset, IsInclusive = inclusive };
}

Expand Down Expand Up @@ -96,11 +97,7 @@ public static EventPosition FromEnqueuedTime(DateTime enqueuedTimeUtc)
/// Gets the sequence number of the event at the position. It can be null if the position is just created
/// from an offset or an enqueued time.
/// </summary>
public long? SequenceNumber
{
get;
internal set;
}
public long? SequenceNumber { get; internal set; }

internal string GetExpression()
{
Expand Down
31 changes: 10 additions & 21 deletions src/Microsoft.Azure.EventHubs/PartitionReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ protected internal PartitionReceiver(
this.prefetchCount = DefaultPrefetchCount;
this.Epoch = epoch;
this.RuntimeInfo = new ReceiverRuntimeInformation(partitionId);
this.ReceiverRuntimeMetricEnabled = receiverOptions == null ? this.EventHubClient.EnableReceiverRuntimeMetric
this.ReceiverRuntimeMetricEnabled = receiverOptions == null
? this.EventHubClient.EnableReceiverRuntimeMetric
: receiverOptions.EnableReceiverRuntimeMetric;
this.Identifier = receiverOptions != null ? receiverOptions.Identifier : null;

this.Identifier = receiverOptions != null
? receiverOptions.Identifier
: null;
this.RetryPolicy = eventHubClient.RetryPolicy.Clone();

EventHubsEventSource.Log.ClientCreated(this.ClientId, this.FormatTraceDetails());
Expand Down Expand Up @@ -87,10 +91,7 @@ protected internal PartitionReceiver(
/// <value>The upper limit of events this receiver will actively receive regardless of whether a receive operation is pending.</value>
public int PrefetchCount
{
get
{
return this.prefetchCount;
}
get => this.prefetchCount;

set
{
Expand Down Expand Up @@ -118,11 +119,7 @@ public int PrefetchCount

/// <summary>Gets the identifier of a receiver which was set during the creation of the receiver.</summary>
/// <value>A string representing the identifier of a receiver. It will return null if the identifier is not set.</value>
public string Identifier
{
get;
private set;
}
public string Identifier { get; private set; }

/// <summary>
/// Receive a batch of <see cref="EventData"/>'s from an EventHub partition
Expand Down Expand Up @@ -256,18 +253,10 @@ public sealed override Task CloseAsync()
/// Gets the approximate receiver runtime information for a logical partition of an Event Hub.
/// To enable the setting, refer to <see cref="ReceiverOptions"/> and <see cref="EventHubClient.EnableReceiverRuntimeMetric"/>
/// </summary>
public ReceiverRuntimeInformation RuntimeInfo
{
get;
private set;
}
public ReceiverRuntimeInformation RuntimeInfo { get; private set; }

/// <summary> Gets a value indicating whether the runtime metric of a receiver is enabled. </summary>
public bool ReceiverRuntimeMetricEnabled
{
get;
private set;
}
public bool ReceiverRuntimeMetricEnabled { get; private set; }

/// <summary></summary>
/// <returns></returns>
Expand Down
10 changes: 5 additions & 5 deletions src/Microsoft.Azure.EventHubs/PartitionSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public EventDataBatch CreateBatch()
{
return this.CreateBatch(new BatchOptions());
}

/// <summary>Creates a batch where event data objects can be added for later SendAsync call.</summary>
/// <param name="options"><see cref="BatchOptions" /> to define partition key and max message size.</param>
/// <returns>Returns <see cref="EventDataBatch" />.</returns>
Expand Down Expand Up @@ -172,11 +172,11 @@ public async Task SendAsync(EventDataBatch eventDataBatch)

await this.SendAsync(eventDataBatch.ToEnumerable());
}

/// <summary>
/// Closes and releases resources for the <see cref="PartitionSender"/>.
/// </summary>
/// <returns>An asynchronous operation</returns>
/// Closes and releases resources for the <see cref="PartitionSender"/>.
/// </summary>
/// <returns>An asynchronous operation</returns>
public override async Task CloseAsync()
{
EventHubsEventSource.Log.ClientCloseStart(this.ClientId);
Expand Down
5 changes: 2 additions & 3 deletions src/Microsoft.Azure.EventHubs/Primitives/AsyncLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@ namespace Microsoft.Azure.EventHubs
/// </summary>
public class AsyncLock : IDisposable
{
readonly SemaphoreSlim asyncSemaphore;
readonly SemaphoreSlim asyncSemaphore = new SemaphoreSlim(1);
readonly Task<LockRelease> lockRelease;
bool disposed = false;
bool disposed;

/// <summary>
/// Returns a new AsyncLock.
/// </summary>
public AsyncLock()
{
asyncSemaphore = new SemaphoreSlim(1);
lockRelease = Task.FromResult(new LockRelease(this));
}

Expand Down
5 changes: 1 addition & 4 deletions src/Microsoft.Azure.EventHubs/Primitives/ClientEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ public string ClientId
/// </summary>
public RetryPolicy RetryPolicy
{
get
{
return this.retryPolicy;
}
get => this.retryPolicy;

set
{
Expand Down
8 changes: 1 addition & 7 deletions src/Microsoft.Azure.EventHubs/Primitives/Fx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@ static class Fx
{
private static readonly Lazy<ExceptionUtility> exceptionUtility = new Lazy<ExceptionUtility>(() => new ExceptionUtility());

public static ExceptionUtility Exception
{
get
{
return exceptionUtility.Value;
}
}
public static ExceptionUtility Exception => exceptionUtility.Value;

[Conditional("DEBUG")]
public static void Assert(bool condition, string message)
Expand Down
Loading

0 comments on commit 8aae6b6

Please sign in to comment.