Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CosmosDiagnostics: Fixes memory leak due to unbounded growth of CosmosDiagnosticsContextCore and removes reference to response stream. #2129

Merged
merged 15 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<ClientOfficialVersion>3.16.0</ClientOfficialVersion>
<ClientPreviewVersion>3.15.2</ClientPreviewVersion>
<DirectVersion>3.15.4</DirectVersion>
<DirectVersion>3.17.0</DirectVersion>
neildsh marked this conversation as resolved.
Show resolved Hide resolved
<EncryptionVersion>1.0.0-preview9</EncryptionVersion>
<HybridRowVersion>1.1.0-preview1</HybridRowVersion>
<AboveDirBuildProps>$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))</AboveDirBuildProps>
Expand Down
91 changes: 91 additions & 0 deletions Microsoft.Azure.Cosmos/src/Diagnostics/BoundedList.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Diagnostics
{
using System;
using System.Collections;
using System.Collections.Generic;

/// <summary>
/// A list that can grow only up to a specified capacity.
/// In the growth phase, it uses the standard List type.
/// At capacity, it switches to a circular queue implementation.
/// </summary>
internal sealed class BoundedList<T> : IEnumerable<T>
{
private readonly int capacity;

private List<T> elementList;

private CircularQueue<T> circularQueue;

public BoundedList(int capacity)
{
if (capacity < 1)
{
throw new ArgumentOutOfRangeException("BoundedList capacity must be positive");
}

this.capacity = capacity;
this.elementList = new List<T>();
this.circularQueue = null;
}

public void Add(T element)
{
if (this.circularQueue != null)
{
this.circularQueue.Add(element);
neildsh marked this conversation as resolved.
Show resolved Hide resolved
}
else if (this.elementList.Count < this.capacity)
{
this.elementList.Add(element);
neildsh marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
this.circularQueue = new CircularQueue<T>(this.capacity);
this.circularQueue.AddRange(this.elementList);
this.elementList = null;
this.circularQueue.Add(element);
}
}

public void AddRange(IEnumerable<T> elements)
{
foreach (T element in elements)
{
this.Add(element);
}
}

public IEnumerator<T> GetListEnumerator()
{
// Using a for loop with a yield prevents Issue #1467 which causes
// ThrowInvalidOperationException if a new diagnostics is getting added
// while the enumerator is being used.
List<T> elements = this.elementList;
for (int index = 0; index < elements.Count; ++index)
{
yield return elements[index];
}
}

public IEnumerator<T> GetEnumerator()
{
if (this.circularQueue != null)
{
return this.circularQueue.GetEnumerator();
}
else
{
return this.GetListEnumerator();
}
}

IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
}
}
111 changes: 111 additions & 0 deletions Microsoft.Azure.Cosmos/src/Diagnostics/CircularQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Diagnostics
{
using System;
using System.Collections;
using System.Collections.Generic;

/// <summary>
/// simple circular queue that preallocates the underlying buffer
/// </summary>
internal sealed class CircularQueue<T> : IEnumerable<T>
{
private readonly T[] buffer;

private int head;
private int tail;

/// <summary>
/// Capacity of the queue.
/// </summary>
public int Capacity => this.buffer.Length;

/// <summary>
/// True if adding an element will cause one to be evicted.
/// </summary>
public bool Full => this.GetNextIndex(this.tail) == this.head;

/// <summary>
/// True when the queue is empty.
/// </summary>
public bool Empty => this.tail == this.head;

/// <summary>
/// Initializes a new instance of the <see cref="CircularQueue{T}"/> class.
/// </summary>
/// <param name="capacity"></param>
public CircularQueue(int capacity)
{
if (capacity < 1)
{
throw new ArgumentOutOfRangeException("circular queue capacity must be positive");
}

this.head = 0;
this.tail = 0;
this.buffer = new T[capacity + 1]; // one empty slot
}

/// <summary>
/// Adds a new element to the queue. Can cause an older element to be evicted.
/// </summary>
/// <param name="element"></param>
public void Add(T element)
{
if (this.Full)
neildsh marked this conversation as resolved.
Show resolved Hide resolved
{
this.TryPop(out _);
}

this.buffer[this.tail] = element;
this.tail = this.GetNextIndex(this.tail);
}

/// <summary>
/// Adds a subrange of the argument to the queue depending on capacity.
/// </summary>
/// <param name="elements"></param>
public void AddRange(IEnumerable<T> elements)
{
foreach (T element in elements)
{
this.Add(element);
}
}

private int GetNextIndex(int index)
{
return (index + 1) % this.Capacity;
}

private bool TryPop(out T element)
{
element = default;
if (this.Empty) return false;

element = this.buffer[this.head];
this.head = this.GetNextIndex(this.head);
return true;
}

/// <inheritdoc/>
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}

/// <inheritdoc/>
public IEnumerator<T> GetEnumerator()
{
if (!this.Empty)
{
for (int i = this.head; i != this.tail; i = this.GetNextIndex(i))
{
yield return this.buffer[i];
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ namespace Microsoft.Azure.Cosmos
internal sealed class CosmosDiagnosticsContextCore : CosmosDiagnosticsContext
{
private static readonly string DefaultUserAgentString;
internal static int Capacity = 5120;
neildsh marked this conversation as resolved.
Show resolved Hide resolved
private readonly CosmosDiagnosticScope overallScope;

/// <summary>
/// Detailed view of all the operations.
/// </summary>
private List<CosmosDiagnosticsInternal> ContextList { get; }
private BoundedList<CosmosDiagnosticsInternal> ContextList { get; }

private int totalResponseCount = 0;
private int failedResponseCount = 0;
Expand All @@ -49,7 +50,7 @@ public CosmosDiagnosticsContextCore(
this.UserAgent = userAgentString ?? throw new ArgumentNullException(nameof(userAgentString));
this.OperationName = operationName ?? throw new ArgumentNullException(nameof(operationName));
this.StartUtc = DateTime.UtcNow;
this.ContextList = new List<CosmosDiagnosticsInternal>();
this.ContextList = new BoundedList<CosmosDiagnosticsInternal>(Capacity);
this.Diagnostics = new CosmosDiagnosticsCore(this);
this.overallScope = new CosmosDiagnosticScope("Overall");
}
Expand Down Expand Up @@ -137,9 +138,9 @@ internal override void AddDiagnosticsInternal(PointOperationStatistics pointOper

internal override void AddDiagnosticsInternal(StoreResponseStatistics storeResponseStatistics)
{
if (storeResponseStatistics.StoreResult != null)
if (storeResponseStatistics.StoreResultStatistics != null)
{
this.AddResponseCount((int)storeResponseStatistics.StoreResult.StatusCode);
this.AddResponseCount((int)storeResponseStatistics.StoreResultStatistics.StatusCode);
}

this.AddToContextList(storeResponseStatistics);
Expand Down Expand Up @@ -199,9 +200,9 @@ public override IEnumerator<CosmosDiagnosticsInternal> GetEnumerator()
// while the enumerator is being used.
lock (this.ContextList)
{
for (int i = 0; i < this.ContextList.Count; i++)
foreach (CosmosDiagnosticsInternal context in this.ContextList)
{
yield return this.ContextList[i];
yield return context;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,12 @@ public override void Visit(StoreResponseStatistics storeResponseStatistics)
this.jsonWriter.WritePropertyName("LocationEndpoint");
this.jsonWriter.WriteValue(storeResponseStatistics.LocationEndpoint);

if (storeResponseStatistics.StoreResult != null)
if (storeResponseStatistics.StoreResultStatistics != null)
{
this.jsonWriter.WritePropertyName("ActivityId");
this.jsonWriter.WriteValue(storeResponseStatistics.StoreResult.ActivityId);

this.jsonWriter.WriteValue(storeResponseStatistics.StoreResultStatistics.ActivityId);
this.jsonWriter.WritePropertyName("StoreResult");
this.jsonWriter.WriteValue(storeResponseStatistics.StoreResult.ToString());
this.jsonWriter.WriteValue(storeResponseStatistics.StoreResultStatistics.ToString());
}

this.jsonWriter.WriteEndObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ internal sealed class StoreResponseStatistics : CosmosDiagnosticsInternal
{
public readonly DateTime? RequestStartTime;
public readonly DateTime RequestResponseTime;
public readonly StoreResult StoreResult;
public readonly ResourceType RequestResourceType;
public readonly OperationType RequestOperationType;
public readonly Uri LocationEndpoint;
public readonly bool IsSupplementalResponse;
public readonly StoreResultStatistics StoreResultStatistics;

public StoreResponseStatistics(
DateTime? requestStartTime,
Expand All @@ -26,11 +26,28 @@ public StoreResponseStatistics(
{
this.RequestStartTime = requestStartTime;
this.RequestResponseTime = requestResponseTime;
this.StoreResult = storeResult;
this.RequestResourceType = resourceType;
this.RequestOperationType = operationType;
this.LocationEndpoint = locationEndpoint;
this.IsSupplementalResponse = operationType == OperationType.Head || operationType == OperationType.HeadFeed;

if (storeResult != null)
{
this.StoreResultStatistics = new StoreResultStatistics(
exception: storeResult.Exception,
statusCode: storeResult.StatusCode,
subStatusCode: storeResult.SubStatusCode,
partitionKeyRangeId: storeResult.PartitionKeyRangeId,
lsn: storeResult.LSN,
requestCharge: storeResult.RequestCharge,
isValid: storeResult.IsValid,
storePhysicalAddress: storeResult.StorePhysicalAddress,
globalCommittedLSN: storeResult.GlobalCommittedLSN,
itemLSN: storeResult.ItemLSN,
sessionToken: storeResult.SessionToken,
usingLocalLSN: storeResult.UsingLocalLSN,
activityId: storeResult.ActivityId);
}
}

public override void Accept(CosmosDiagnosticsInternalVisitor visitor)
Expand Down
Loading