Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SynchronizationContext and blocking calls #1078

Merged
merged 46 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
64dba35
Adding context
ealsur Dec 3, 2019
b321a58
Adding netfx test project
ealsur Dec 4, 2019
e70d055
conditional Task.Run
ealsur Dec 4, 2019
1da379b
Using TaskHelper
ealsur Dec 4, 2019
73025c8
Wrapping more calls
ealsur Dec 4, 2019
8cd291a
yml update
ealsur Dec 5, 2019
0b8d673
Adding query in test
ealsur Dec 5, 2019
a14d9dc
Undoing
ealsur Dec 5, 2019
050374c
Removing duplicates
ealsur Dec 5, 2019
a311148
Removing more duplicates
ealsur Dec 5, 2019
a5766e4
Adding UTs for TaskHelper
ealsur Dec 5, 2019
cac3713
Removing unneeded file
ealsur Dec 5, 2019
205721e
Undoing more changes
ealsur Dec 5, 2019
aef172b
missing indent
ealsur Dec 5, 2019
377a80a
Changing assemblyname
ealsur Dec 5, 2019
4c14f77
Testing another SNK
ealsur Dec 5, 2019
ea501f8
merge
ealsur Dec 5, 2019
f51e21f
YML changes
ealsur Dec 5, 2019
15b9b3d
AssemblyName
ealsur Dec 5, 2019
9b6ac5d
Testing properties
ealsur Dec 5, 2019
e4e32db
Testing with 471
ealsur Dec 5, 2019
021e482
Testing netcore
ealsur Dec 6, 2019
70d8a21
net461
ealsur Dec 6, 2019
148d10c
Original setup
ealsur Dec 6, 2019
e8a3f5e
merge with master
ealsur Dec 6, 2019
cd0de30
Testing with vstest
ealsur Dec 6, 2019
c6573ae
paths
ealsur Dec 6, 2019
b281ee8
Using full path
ealsur Dec 6, 2019
3fb71a8
Updating vstest
ealsur Dec 6, 2019
a9c33f8
Forcing version
ealsur Dec 6, 2019
22b4e39
Refactoring
ealsur Dec 6, 2019
ec8605a
Undoing other changes
ealsur Dec 6, 2019
5987e46
Parameters
ealsur Dec 6, 2019
858085a
breaklines
ealsur Dec 6, 2019
580733d
Undoing some changes
ealsur Dec 9, 2019
593f869
Inline wrapper not conditional
ealsur Dec 9, 2019
35db756
Fixing tests
ealsur Dec 9, 2019
526797b
internal properties
ealsur Dec 9, 2019
163e4ac
Fixing tests
ealsur Dec 10, 2019
899db30
merge with master
ealsur Dec 10, 2019
8f9edfc
Fixing more tests
ealsur Dec 10, 2019
0a351aa
merge with master again
ealsur Dec 10, 2019
609b3ee
Fixing tests
ealsur Dec 10, 2019
88c19bc
merge with master
ealsur Dec 10, 2019
4b6161f
Merge branch 'master' into users/ealsur/synchcontext
kirankumarkolli Dec 10, 2019
5115829
Merge branch 'master' into users/ealsur/synchcontext
j82w Dec 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public ChangeFeedProcessorBuilder WithLeaseContainer(Container leaseContainer)
if (this.leaseContainer != null) throw new InvalidOperationException("The builder already defined a lease container.");
if (this.LeaseStoreManager != null) throw new InvalidOperationException("The builder already defined an in-memory lease container instance.");

this.leaseContainer = (ContainerCore)leaseContainer;
this.leaseContainer = (ContainerInlineCore)leaseContainer;
return this;
}

Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public virtual Task<AccountProperties> ReadAccountAsync()
/// <returns>Cosmos database proxy</returns>
public virtual Database GetDatabase(string id)
{
return new DatabaseCore(this.ClientContext, id);
return new DatabaseInlineCore(new DatabaseCore(this.ClientContext, id));
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Threading;
using System.Threading.Tasks;

// This class acts as a wrapper for environments that use SynchronizationContext.
internal sealed class ConflictsInlineCore : Conflicts
{
private readonly ConflictsCore conflicts;

internal ConflictsInlineCore(ConflictsCore conflicts)
{
if (conflicts == null)
{
throw new ArgumentNullException(nameof(conflicts));
}

this.conflicts = conflicts;
}

public override Task<ResponseMessage> DeleteAsync(
ConflictProperties conflict,
PartitionKey partitionKey,
CancellationToken cancellationToken = default(CancellationToken))
{
return TaskHelper.RunInlineIfNeededAsync(() => this.conflicts.DeleteAsync(conflict, partitionKey, cancellationToken));
}

public override FeedIterator GetConflictQueryStreamIterator(
string queryText = null,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.conflicts.GetConflictQueryStreamIterator(queryText, continuationToken, requestOptions);
}

public override FeedIterator<T> GetConflictQueryIterator<T>(
string queryText = null,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.conflicts.GetConflictQueryIterator<T>(queryText, continuationToken, requestOptions);
}

public override FeedIterator GetConflictQueryStreamIterator(
QueryDefinition queryDefinition,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.conflicts.GetConflictQueryStreamIterator(queryDefinition, continuationToken, requestOptions);
}

public override FeedIterator<T> GetConflictQueryIterator<T>(
QueryDefinition queryDefinition,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.conflicts.GetConflictQueryIterator<T>(queryDefinition, continuationToken, requestOptions);
}

public override Task<ItemResponse<T>> ReadCurrentAsync<T>(
ConflictProperties cosmosConflict,
PartitionKey partitionKey,
CancellationToken cancellationToken = default(CancellationToken))
{
return TaskHelper.RunInlineIfNeededAsync(() => this.conflicts.ReadCurrentAsync<T>(cosmosConflict, partitionKey, cancellationToken));
}

public override T ReadConflictContent<T>(ConflictProperties cosmosConflict)
{
return this.conflicts.ReadConflictContent<T>(cosmosConflict);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ internal ContainerCore(
id: containerId);

this.Database = database;
this.Conflicts = new ConflictsCore(this.ClientContext, this);
this.Scripts = new ScriptsCore(this, this.ClientContext);
this.Conflicts = new ConflictsInlineCore(new ConflictsCore(this.ClientContext, this));
this.Scripts = new ScriptsInlineCore(new ScriptsCore(this, this.ClientContext));
this.cachedUriSegmentWithoutId = this.GetResourceSegmentUriWithoutId();
this.queryClient = cosmosQueryClient ?? new CosmosQueryClientCore(this.ClientContext, this);
this.BatchExecutor = this.InitializeBatchExecutorForContainer();
Expand Down
252 changes: 252 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

// This class acts as a wrapper for environments that use SynchronizationContext.
internal sealed partial class ContainerInlineCore : Container
{
private readonly ContainerCore container;

public override string Id => this.container.Id;

public override Conflicts Conflicts => this.container.Conflicts;

public override Scripts.Scripts Scripts => this.container.Scripts;

internal CosmosClientContext ClientContext => this.container.ClientContext;

internal Uri LinkUri => this.container.LinkUri;

internal ContainerInlineCore(ContainerCore container)
{
if (container == null)
{
throw new ArgumentNullException(nameof(container));
}

this.container = container;
}

public override Task<ContainerResponse> ReadContainerAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadContainerAsync(requestOptions, cancellationToken));
}

public override Task<ResponseMessage> ReadContainerStreamAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadContainerStreamAsync(requestOptions, cancellationToken));
}

public override Task<ContainerResponse> ReplaceContainerAsync(
ContainerProperties containerProperties,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReplaceContainerAsync(containerProperties, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> ReplaceContainerStreamAsync(
ContainerProperties containerProperties,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReplaceContainerStreamAsync(containerProperties, requestOptions, cancellationToken));
}

public override Task<ContainerResponse> DeleteContainerAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.DeleteContainerAsync(requestOptions, cancellationToken));
}

public override Task<ResponseMessage> DeleteContainerStreamAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.DeleteContainerStreamAsync(requestOptions, cancellationToken));
}

public override Task<int?> ReadThroughputAsync(CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadThroughputAsync(cancellationToken));
}

public override Task<ThroughputResponse> ReadThroughputAsync(
RequestOptions requestOptions,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadThroughputAsync(requestOptions, cancellationToken));
}

public override Task<ThroughputResponse> ReplaceThroughputAsync(
int throughput,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReplaceThroughputAsync(throughput, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> CreateItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.CreateItemStreamAsync(streamPayload, partitionKey, requestOptions, cancellationToken));
}

public override Task<ItemResponse<T>> CreateItemAsync<T>(T item,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.CreateItemAsync<T>(item, partitionKey, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> ReadItemStreamAsync(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadItemStreamAsync(id, partitionKey, requestOptions, cancellationToken));
}

public override Task<ItemResponse<T>> ReadItemAsync<T>(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadItemAsync<T>(id, partitionKey, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> UpsertItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.UpsertItemStreamAsync(streamPayload, partitionKey, requestOptions, cancellationToken));
}

public override Task<ItemResponse<T>> UpsertItemAsync<T>(
T item,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.UpsertItemAsync<T>(item, partitionKey, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> ReplaceItemStreamAsync(
Stream streamPayload,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReplaceItemStreamAsync(streamPayload, id, partitionKey, requestOptions, cancellationToken));
}

public override Task<ItemResponse<T>> ReplaceItemAsync<T>(
T item,
string id,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReplaceItemAsync<T>(item, id, partitionKey, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> DeleteItemStreamAsync(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.DeleteItemStreamAsync(id, partitionKey, requestOptions, cancellationToken));
}

public override Task<ItemResponse<T>> DeleteItemAsync<T>(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.DeleteItemAsync<T>(id, partitionKey, requestOptions, cancellationToken));
}

public override FeedIterator GetItemQueryStreamIterator(
QueryDefinition queryDefinition,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.container.GetItemQueryStreamIterator(queryDefinition, continuationToken, requestOptions);
}

public override FeedIterator<T> GetItemQueryIterator<T>(
QueryDefinition queryDefinition,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.container.GetItemQueryIterator<T>(queryDefinition, continuationToken, requestOptions);
}

public override FeedIterator GetItemQueryStreamIterator(string queryText = null,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.container.GetItemQueryStreamIterator(queryText, continuationToken, requestOptions);
}

public override FeedIterator<T> GetItemQueryIterator<T>(
string queryText = null,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.container.GetItemQueryIterator<T>(queryText, continuationToken, requestOptions);
}

public override IOrderedQueryable<T> GetItemLinqQueryable<T>(bool allowSynchronousQueryExecution = false,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.container.GetItemLinqQueryable<T>(allowSynchronousQueryExecution, continuationToken, requestOptions);
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(
string processorName,
ChangesHandler<T> onChangesDelegate)
{
return this.container.GetChangeFeedProcessorBuilder<T>(processorName, onChangesDelegate);
}

public override ChangeFeedProcessorBuilder GetChangeFeedEstimatorBuilder(string processorName,
ChangesEstimationHandler estimationDelegate,
TimeSpan? estimationPeriod = null)
{
return this.container.GetChangeFeedEstimatorBuilder(processorName, estimationDelegate, estimationPeriod);
}

public override TransactionalBatch CreateTransactionalBatch(PartitionKey partitionKey)
{
return this.container.CreateTransactionalBatch(partitionKey);
}

public static implicit operator ContainerCore(ContainerInlineCore containerInlineCore) => containerInlineCore.container;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ public override Container GetContainer(string id)
throw new ArgumentNullException(nameof(id));
}

return new ContainerCore(
return new ContainerInlineCore(new ContainerCore(
this.ClientContext,
this,
id);
id));
}

public override Task<ResponseMessage> CreateContainerStreamAsync(
Expand Down Expand Up @@ -315,10 +315,10 @@ public override User GetUser(string id)
throw new ArgumentNullException(nameof(id));
}

return new UserCore(
return new UserInlineCore(new UserCore(
this.ClientContext,
this,
id);
id));
}

public Task<ResponseMessage> CreateUserStreamAsync(
Expand Down
Loading