Skip to content

Commit

Permalink
Query: Fix memory leak by making FeedIterator IDisposable (#1613)
Browse files Browse the repository at this point in the history
* Add disposable to FeedIterator.
  • Loading branch information
j82w committed Jun 11, 2020
1 parent 74141c9 commit 908cea4
Show file tree
Hide file tree
Showing 18 changed files with 508 additions and 276 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,11 @@ public ComparableTaskScheduler(IEnumerable<IComparableTask> tasks, int maximumCo

public int MaximumConcurrencyLevel { get; private set; }

public int CurrentRunningTaskCount
{
get
{
return this.MaximumConcurrencyLevel - Math.Max(0, this.canRunTaskSemaphoreSlim.CurrentCount);
}
}
public int CurrentRunningTaskCount => this.MaximumConcurrencyLevel - Math.Max(0, this.canRunTaskSemaphoreSlim.CurrentCount);

public bool IsStopped
{
get
{
return this.isStopped;
}
}
public bool IsStopped => this.isStopped;

private CancellationToken CancellationToken
{
get
{
return this.tokenSource.Token;
}
}
private CancellationToken CancellationToken => this.tokenSource.Token;

public void IncreaseMaximumConcurrencyLevel(int delta)
{
Expand All @@ -83,6 +65,9 @@ public void IncreaseMaximumConcurrencyLevel(int delta)
public void Dispose()
{
this.Stop();

this.canRunTaskSemaphoreSlim.Dispose();
this.tokenSource.Dispose();
}

public void Stop()
Expand Down
6 changes: 6 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,11 @@ public override CosmosElement GetCosmosElementContinuationToken()
{
return this.cosmosQueryExecutionContext.GetCosmosElementContinuationToken();
}

protected override void Dispose(bool disposing)
{
this.cosmosQueryExecutionContext.Dispose();
base.Dispose(disposing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -33,8 +34,10 @@ namespace Microsoft.Azure.Cosmos
/// ]]>
/// </code>
/// </example>
public abstract class FeedIterator
public abstract class FeedIterator : IDisposable
{
private bool disposedValue;

/// <summary>
/// Tells if there is more results that need to be retrieved from the service
/// </summary>
Expand Down Expand Up @@ -92,5 +95,29 @@ public abstract class FeedIterator
/// </code>
/// </example>
public abstract Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Releases the unmanaged resources used by the FeedIterator and optionally
/// releases the managed resources.
/// </summary>
/// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing)
{
// Default implementation does not need to clean anything up
if (!this.disposedValue)
{
this.disposedValue = true;
}
}

/// <summary>
/// Releases the unmanaged resources used by the FeedIterator and optionally
/// releases the managed resources.
/// </summary>
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
this.Dispose(disposing: true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,11 @@ public override async Task<FeedResponse<T>> ReadNextAsync(CancellationToken canc
ResponseMessage response = await this.feedIterator.ReadNextAsync(cancellationToken);
return this.responseCreator(response);
}

protected override void Dispose(bool disposing)
{
this.feedIterator.Dispose();
base.Dispose(disposing);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;

internal sealed class FeedIteratorInlineCore : FeedIteratorInternal
{
private readonly FeedIteratorInternal feedIteratorInternal;

internal FeedIteratorInlineCore(
FeedIterator feedIterator)
{
if (!(feedIterator is FeedIteratorInternal feedIteratorInternal))
{
throw new ArgumentNullException(nameof(feedIterator));
}

this.feedIteratorInternal = feedIteratorInternal;
}

internal FeedIteratorInlineCore(
FeedIteratorInternal feedIteratorInternal)
{
this.feedIteratorInternal = feedIteratorInternal ?? throw new ArgumentNullException(nameof(feedIteratorInternal));
}

public override bool HasMoreResults => this.feedIteratorInternal.HasMoreResults;

public override CosmosElement GetCosmosElementContinuationToken()
{
return this.feedIteratorInternal.GetCosmosElementContinuationToken();
}

public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.feedIteratorInternal.ReadNextAsync(cancellationToken));
}

protected override void Dispose(bool disposing)
{
this.feedIteratorInternal.Dispose();
base.Dispose(disposing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,6 @@ namespace Microsoft.Azure.Cosmos
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;

internal sealed class FeedIteratorInlineCore : FeedIteratorInternal
{
private readonly FeedIteratorInternal feedIteratorInternal;

internal FeedIteratorInlineCore(
FeedIterator feedIterator)
{
if (feedIterator is FeedIteratorInternal feedIteratorInternal)
{
this.feedIteratorInternal = feedIteratorInternal;
}
else
{
throw new ArgumentNullException(nameof(feedIterator));
}
}

internal FeedIteratorInlineCore(
FeedIteratorInternal feedIteratorInternal)
{
this.feedIteratorInternal = feedIteratorInternal ?? throw new ArgumentNullException(nameof(feedIteratorInternal));
}

public override bool HasMoreResults => this.feedIteratorInternal.HasMoreResults;

public override CosmosElement GetCosmosElementContinuationToken()
{
return this.feedIteratorInternal.GetCosmosElementContinuationToken();
}

public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.feedIteratorInternal.ReadNextAsync(cancellationToken));
}
}

internal sealed class FeedIteratorInlineCore<T> : FeedIteratorInternal<T>
{
Expand All @@ -53,14 +16,12 @@ internal sealed class FeedIteratorInlineCore<T> : FeedIteratorInternal<T>
internal FeedIteratorInlineCore(
FeedIterator<T> feedIterator)
{
if (feedIterator is FeedIteratorInternal<T> feedIteratorInternal)
{
this.feedIteratorInternal = feedIteratorInternal;
}
else
if (!(feedIterator is FeedIteratorInternal<T> feedIteratorInternal))
{
throw new ArgumentNullException(nameof(feedIterator));
}

this.feedIteratorInternal = feedIteratorInternal;
}

internal FeedIteratorInlineCore(
Expand All @@ -80,5 +41,11 @@ public override CosmosElement GetCosmosElementContinuationToken()
{
return this.feedIteratorInternal.GetCosmosElementContinuationToken();
}

protected override void Dispose(bool disposing)
{
this.feedIteratorInternal.Dispose();
base.Dispose(disposing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -29,8 +30,10 @@ namespace Microsoft.Azure.Cosmos
/// ]]>
/// </code>
/// </example>
public abstract class FeedIterator<T>
public abstract class FeedIterator<T> : IDisposable
{
private bool disposedValue;

/// <summary>
/// Tells if there is more results that need to be retrieved from the service
/// </summary>
Expand Down Expand Up @@ -80,5 +83,29 @@ public abstract class FeedIterator<T>
/// </code>
/// </example>
public abstract Task<FeedResponse<T>> ReadNextAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Releases the unmanaged resources used by the FeedIterator and optionally
/// releases the managed resources.
/// </summary>
/// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing)
{
// Default implementation does not need to clean anything up
if (!this.disposedValue)
{
this.disposedValue = true;
}
}

/// <summary>
/// Releases the unmanaged resources used by the FeedIterator and optionally
/// releases the managed resources.
/// </summary>
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
this.Dispose(disposing: true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,21 +230,22 @@ public async Task QueryRequestRateTest(bool directMode)
List<dynamic> results = new List<dynamic>();
try
{
FeedIterator<dynamic> feedIterator = containerWithThrottle.GetItemQueryIterator<dynamic>(
using (FeedIterator<dynamic> feedIterator = containerWithThrottle.GetItemQueryIterator<dynamic>(
"select * from T where STARTSWITH(T.id, \"BasicQueryItem\")",
requestOptions: new QueryRequestOptions()
{
MaxItemCount = 1,
MaxConcurrency = 1
});

while (feedIterator.HasMoreResults)
}))
{
FeedResponse<dynamic> response = await feedIterator.ReadNextAsync();
Assert.IsTrue(response.Count <= 1);
Assert.IsTrue(response.Resource.Count() <= 1);
while (feedIterator.HasMoreResults)
{
FeedResponse<dynamic> response = await feedIterator.ReadNextAsync();
Assert.IsTrue(response.Count <= 1);
Assert.IsTrue(response.Resource.Count() <= 1);

results.AddRange(response);
results.AddRange(response);
}
}
Assert.Fail("Should throw 429 exception after the first page.");
}
Expand Down Expand Up @@ -365,22 +366,23 @@ public async Task ItemTest(bool directMode)
Assert.AreEqual(1, results.Count);

// LINQ to feed iterator Read All with partition key
FeedIterator<dynamic> iterator = container.GetItemLinqQueryable<dynamic>(
using (FeedIterator<dynamic> iterator = container.GetItemLinqQueryable<dynamic>(
allowSynchronousQueryExecution: true,
requestOptions: new QueryRequestOptions()
{
MaxItemCount = 1,
PartitionKey = new PartitionKey("BasicQueryItem")
}).ToFeedIterator();

List<dynamic> linqResults = new List<dynamic>();
while (iterator.HasMoreResults)
}).ToFeedIterator())
{
linqResults.AddRange(await iterator.ReadNextAsync());
}
List<dynamic> linqResults = new List<dynamic>();
while (iterator.HasMoreResults)
{
linqResults.AddRange(await iterator.ReadNextAsync());
}

Assert.AreEqual(1, linqResults.Count);
Assert.AreEqual("BasicQueryItem", linqResults.First().pk.ToString());
Assert.AreEqual(1, linqResults.Count);
Assert.AreEqual("BasicQueryItem", linqResults.First().pk.ToString());
}
}

[TestMethod]
Expand Down
Loading

0 comments on commit 908cea4

Please sign in to comment.