Skip to content

Commit

Permalink
Query: Remove serialization of async query code (#17016)
Browse files Browse the repository at this point in the history
This was causing deadlock issue in product because after concurrency exception the context is unusuable and in pooling scenario it caused forever wait in async queries

Resolves #10914
Resolves #12138
  • Loading branch information
smitpatel committed Aug 8, 2019
1 parent b19d2e0 commit 6f8f396
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ public async ValueTask<bool> MoveNextAsync()
{
try
{
using (await _cosmosQueryContext.ConcurrencyDetector.EnterCriticalSectionAsync(_cancellationToken))
using (_cosmosQueryContext.ConcurrencyDetector.EnterCriticalSection())
{
if (_enumerator == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public static async Task<int> ExecuteSqlCommandAsync(
var concurrencyDetector = facadeDependencies.ConcurrencyDetector;
var logger = facadeDependencies.CommandLogger;

using (await concurrencyDetector.EnterCriticalSectionAsync(cancellationToken))
using (concurrencyDetector.EnterCriticalSection())
{
var rawSqlCommand = GetFacadeDependencies(databaseFacade).RawSqlCommandBuilder
.Build(sql.Format, parameters);
Expand Down Expand Up @@ -645,7 +645,7 @@ public static async Task<int> ExecuteSqlRawAsync(
var concurrencyDetector = facadeDependencies.ConcurrencyDetector;
var logger = facadeDependencies.CommandLogger;

using (await concurrencyDetector.EnterCriticalSectionAsync(cancellationToken))
using (concurrencyDetector.EnterCriticalSection())
{
var rawSqlCommand = GetFacadeDependencies(databaseFacade).RawSqlCommandBuilder
.Build(sql, parameters);
Expand Down
2 changes: 1 addition & 1 deletion src/EFCore.Relational/Query/AsyncQueryingEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public async ValueTask<bool> MoveNextAsync()
{
try
{
using (await _relationalQueryContext.ConcurrencyDetector.EnterCriticalSectionAsync(_cancellationToken))
using (_relationalQueryContext.ConcurrencyDetector.EnterCriticalSection())
{
if (_dataReader == null)
{
Expand Down
54 changes: 1 addition & 53 deletions src/EFCore/Internal/ConcurrencyDetector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@ namespace Microsoft.EntityFrameworkCore.Internal
/// The implementation does not need to be thread-safe.
/// </para>
/// </summary>
public class ConcurrencyDetector : IConcurrencyDetector, IDisposable
public class ConcurrencyDetector : IConcurrencyDetector
{
private readonly IDisposable _disposer;

private SemaphoreSlim _semaphore = new SemaphoreSlim(1);

private int _inCriticalSection;

/// <summary>
Expand Down Expand Up @@ -64,43 +61,6 @@ private void ExitCriticalSection()
_inCriticalSection = 0;
}

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual async Task<IDisposable> EnterCriticalSectionAsync(CancellationToken cancellationToken)
{
await _semaphore.WaitAsync(cancellationToken);

return new AsyncDisposer(EnterCriticalSection(), this);
}

private readonly struct AsyncDisposer : IDisposable
{
private readonly IDisposable _disposable;
private readonly ConcurrencyDetector _concurrencyDetector;

public AsyncDisposer(IDisposable disposable, ConcurrencyDetector concurrencyDetector)
{
_disposable = disposable;
_concurrencyDetector = concurrencyDetector;
}

public void Dispose()
{
_disposable.Dispose();

if (_concurrencyDetector._semaphore == null)
{
throw new ObjectDisposedException(GetType().ShortDisplayName(), CoreStrings.ContextDisposed);
}

_concurrencyDetector._semaphore.Release();
}
}

private readonly struct Disposer : IDisposable
{
private readonly ConcurrencyDetector _concurrencyDetector;
Expand All @@ -110,17 +70,5 @@ public Disposer(ConcurrencyDetector concurrencyDetector)

public void Dispose() => _concurrencyDetector.ExitCriticalSection();
}

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual void Dispose()
{
_semaphore?.Dispose();
_semaphore = null;
}
}
}
10 changes: 0 additions & 10 deletions src/EFCore/Internal/IConcurrencyDetector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;

namespace Microsoft.EntityFrameworkCore.Internal
Expand Down Expand Up @@ -31,13 +29,5 @@ public interface IConcurrencyDetector
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
IDisposable EnterCriticalSection();

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
Task<IDisposable> EnterCriticalSectionAsync(CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public virtual async Task Executes_stored_procedure_with_generated_parameter_asy
}
}

[ConditionalFact(Skip = "#12138")]
[ConditionalFact]
public virtual async Task Throws_on_concurrent_command_async()
{
using (var context = CreateContext())
Expand Down
14 changes: 7 additions & 7 deletions test/EFCore.Specification.Tests/ConcurrencyDetectorTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public virtual Task Find_logs_concurrent_access_nonasync()
});
}

[ConditionalFact(Skip = "#12138")]
[ConditionalFact]
public virtual Task Find_logs_concurrent_access_async()
{
return ConcurrencyDetectorTest(c => c.Products.FindAsync(1).AsTask());
Expand All @@ -68,7 +68,7 @@ public virtual Task Count_logs_concurrent_access_nonasync()
});
}

[ConditionalFact(Skip = "#12138")]
[ConditionalFact]
public virtual Task Count_logs_concurrent_access_async()
{
return ConcurrencyDetectorTest(c => c.Products.CountAsync());
Expand All @@ -85,7 +85,7 @@ public virtual Task First_logs_concurrent_access_nonasync()
});
}

[ConditionalFact(Skip = "#12138")]
[ConditionalFact]
public virtual Task First_logs_concurrent_access_async()
{
return ConcurrencyDetectorTest(c => c.Products.FirstAsync());
Expand All @@ -102,7 +102,7 @@ public virtual Task Last_logs_concurrent_access_nonasync()
});
}

[ConditionalFact(Skip = "#12138")]
[ConditionalFact]
public virtual Task Last_logs_concurrent_access_async()
{
return ConcurrencyDetectorTest(c => c.Products.LastAsync());
Expand All @@ -119,7 +119,7 @@ public virtual Task Single_logs_concurrent_access_nonasync()
});
}

[ConditionalFact(Skip = "#12138")]
[ConditionalFact]
public virtual Task Single_logs_concurrent_access_async()
{
return ConcurrencyDetectorTest(c => c.Products.SingleAsync(p => p.ProductID == 1));
Expand All @@ -136,7 +136,7 @@ public virtual Task Any_logs_concurrent_access_nonasync()
});
}

[ConditionalFact(Skip = "#12138")]
[ConditionalFact]
public virtual Task Any_logs_concurrent_access_async()
{
return ConcurrencyDetectorTest(c => c.Products.AnyAsync(p => p.ProductID < 10));
Expand All @@ -153,7 +153,7 @@ public virtual Task ToList_logs_concurrent_access_nonasync()
});
}

[ConditionalFact(Skip = "#12138")]
[ConditionalFact]
public virtual Task ToList_logs_concurrent_access_async()
{
return ConcurrencyDetectorTest(c => c.Products.ToListAsync());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public virtual async Task Mixed_sync_async_in_query_cache()
}
}

[ConditionalFact(Skip = "#12138")]
[ConditionalFact(Skip = "Issue#17019")]
public virtual async Task Throws_on_concurrent_query_list()
{
using (var context = CreateContext())
Expand Down Expand Up @@ -298,7 +298,7 @@ public virtual async Task Throws_on_concurrent_query_list()
}
}

[ConditionalFact(Skip = "#12138")]
[ConditionalFact(Skip = "Issue#17019")]
public virtual async Task Throws_on_concurrent_query_first()
{
using (var context = CreateContext())
Expand All @@ -316,6 +316,7 @@ public virtual async Task Throws_on_concurrent_query_first()
async () =>
{
synchronizationEvent.Wait();
Assert.Equal(
CoreStrings.ConcurrentMethodInvocation,
(await Assert.ThrowsAsync<InvalidOperationException>(
Expand Down

0 comments on commit 6f8f396

Please sign in to comment.