Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Response type: Fix deadlock on scenarios with SynchronizationContext when using Response.Container #1401

Merged
merged 16 commits into from
Apr 24, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ internal class BatchAsyncContainerExecutor : IDisposable
private const int DefaultDispatchTimerInSeconds = 1;
private const int MinimumDispatchTimerInSeconds = 1;

private readonly ContainerCore cosmosContainer;
private readonly ContainerInternal cosmosContainer;
private readonly CosmosClientContext cosmosClientContext;
private readonly int maxServerRequestBodyLength;
private readonly int maxServerRequestOperationCount;
Expand All @@ -49,7 +49,7 @@ internal BatchAsyncContainerExecutor()
}

public BatchAsyncContainerExecutor(
ContainerCore cosmosContainer,
ContainerInternal cosmosContainer,
CosmosClientContext cosmosClientContext,
int maxServerRequestOperationCount,
int maxServerRequestBodyLength,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal class BatchAsyncContainerExecutorCache : IDisposable
private ConcurrentDictionary<string, BatchAsyncContainerExecutor> executorsPerContainer = new ConcurrentDictionary<string, BatchAsyncContainerExecutor>();

public BatchAsyncContainerExecutor GetExecutorForContainer(
ContainerCore container,
ContainerInternal container,
CosmosClientContext cosmosClientContext)
{
if (!cosmosClientContext.ClientOptions.AllowBulkExecution)
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal class BatchCore : TransactionalBatch
{
private readonly PartitionKey partitionKey;

private readonly ContainerCore container;
private readonly ContainerInternal container;

private List<ItemBatchOperation> operations;

Expand All @@ -25,7 +25,7 @@ internal class BatchCore : TransactionalBatch
/// <param name="container">Container that has items on which batch operations are to be performed.</param>
/// <param name="partitionKey">The partition key for all items in the batch. <see cref="PartitionKey"/>.</param>
internal BatchCore(
ContainerCore container,
ContainerInternal container,
PartitionKey partitionKey)
{
this.container = container;
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Batch/BatchExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Microsoft.Azure.Cosmos

internal sealed class BatchExecutor
{
private readonly ContainerCore container;
private readonly ContainerInternal container;

private readonly CosmosClientContext clientContext;

Expand All @@ -28,7 +28,7 @@ internal sealed class BatchExecutor
private readonly CosmosDiagnosticsContext diagnosticsContext;

public BatchExecutor(
ContainerCore container,
ContainerInternal container,
PartitionKey partitionKey,
IReadOnlyList<ItemBatchOperation> operations,
RequestOptions batchOptions,
Expand Down
14 changes: 7 additions & 7 deletions Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ public ItemBatchOperation(
public ItemBatchOperation(
OperationType operationType,
int operationIndex,
ContainerCore containerCore,
ContainerInternal containerCore,
string id = null,
Stream resourceStream = null,
TransactionalBatchItemRequestOptions requestOptions = null)
{
this.OperationType = operationType;
this.OperationIndex = operationIndex;
this.ContainerCore = containerCore;
this.ContainerInternal = containerCore;
this.Id = id;
this.ResourceStream = resourceStream;
this.RequestOptions = requestOptions;
Expand All @@ -72,7 +72,7 @@ public ItemBatchOperation(

public int OperationIndex { get; internal set; }

internal ContainerCore ContainerCore { get; }
internal ContainerInternal ContainerInternal { get; }

internal CosmosDiagnosticsContext DiagnosticsContext { get; set; }

Expand Down Expand Up @@ -321,12 +321,12 @@ internal virtual async Task EncryptAndMaterializeResourceAsync(CosmosSerializerC
if (this.body.IsEmpty && this.ResourceStream != null)
{
Stream stream = this.ResourceStream;
if (this.ContainerCore != null && this.RequestOptions?.EncryptionOptions != null)
if (this.ContainerInternal != null && this.RequestOptions?.EncryptionOptions != null)
{
stream = await this.ContainerCore.ClientContext.EncryptItemAsync(
stream = await this.ContainerInternal.ClientContext.EncryptItemAsync(
stream,
this.RequestOptions.EncryptionOptions,
(DatabaseCore)this.ContainerCore.Database,
(DatabaseInternal)this.ContainerInternal.Database,
this.DiagnosticsContext,
cancellationToken);
}
Expand Down Expand Up @@ -387,7 +387,7 @@ public ItemBatchOperation(
OperationType operationType,
int operationIndex,
T resource,
ContainerCore containerCore,
ContainerInternal containerCore,
string id = null,
TransactionalBatchItemRequestOptions requestOptions = null)
: base(operationType, operationIndex, containerCore: containerCore, id: id, requestOptions: requestOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,11 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
{
for (int index = 0; index < serverRequest.Operations.Count; index++)
{
ContainerCore containerCore = serverRequest.Operations[index].ContainerCore;
ContainerInternal containerCore = serverRequest.Operations[index].ContainerInternal;
TransactionalBatchOperationResult result = response.results[index];
result.ResourceStream = await containerCore.ClientContext.DecryptItemAsync(
result.ResourceStream,
(DatabaseCore)containerCore.Database,
(DatabaseInternal)containerCore.Database,
responseMessage.DiagnosticsContext,
cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ internal sealed class PartitionSynchronizerCore : PartitionSynchronizer
internal static int DefaultDegreeOfParallelism = 25;
#pragma warning restore SA1401 // Fields should be private

private readonly ContainerCore container;
private readonly ContainerInternal container;
private readonly DocumentServiceLeaseContainer leaseContainer;
private readonly DocumentServiceLeaseManager leaseManager;
private readonly int degreeOfParallelism;
private readonly int maxBatchSize;

public PartitionSynchronizerCore(
ContainerCore container,
ContainerInternal container,
DocumentServiceLeaseContainer leaseContainer,
DocumentServiceLeaseManager leaseManager,
int degreeOfParallelism,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ internal sealed class ChangeFeedEstimatorCore : ChangeFeedProcessor

private readonly ChangesEstimationHandler initialEstimateDelegate;
private CancellationTokenSource shutdownCts;
private ContainerCore leaseContainer;
private ContainerInternal leaseContainer;
private string monitoredContainerRid;
private TimeSpan? estimatorPeriod = null;
private ContainerCore monitoredContainer;
private ContainerInternal monitoredContainer;
private DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager;
private FeedEstimator feedEstimator;
private RemainingWorkEstimator remainingWorkEstimator;
Expand Down Expand Up @@ -55,12 +55,12 @@ internal ChangeFeedEstimatorCore(

public void ApplyBuildConfiguration(
DocumentServiceLeaseStoreManager customDocumentServiceLeaseStoreManager,
ContainerCore leaseContainer,
ContainerInternal leaseContainer,
string monitoredContainerRid,
string instanceName,
ChangeFeedLeaseOptions changeFeedLeaseOptions,
ChangeFeedProcessorOptions changeFeedProcessorOptions,
ContainerCore monitoredContainer)
ContainerInternal monitoredContainer)
{
if (monitoredContainer == null) throw new ArgumentNullException(nameof(monitoredContainer));
if (leaseContainer == null && customDocumentServiceLeaseStoreManager == null) throw new ArgumentNullException(nameof(leaseContainer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,36 @@ public class ChangeFeedProcessorBuilder
{
private const string InMemoryDefaultHostName = "InMemory";

private readonly ContainerCore monitoredContainer;
private readonly ContainerInternal monitoredContainer;
private readonly ChangeFeedProcessor changeFeedProcessor;
private readonly ChangeFeedLeaseOptions changeFeedLeaseOptions;
private readonly Action<DocumentServiceLeaseStoreManager,
ContainerCore,
ContainerInternal,
string,
string,
ChangeFeedLeaseOptions,
ChangeFeedProcessorOptions,
ContainerCore> applyBuilderConfiguration;
ContainerInternal> applyBuilderConfiguration;

private ChangeFeedProcessorOptions changeFeedProcessorOptions;

private ContainerCore leaseContainer;
private ContainerInternal leaseContainer;
private string InstanceName;
private DocumentServiceLeaseStoreManager LeaseStoreManager;
private string monitoredContainerRid;
private bool isBuilt;

internal ChangeFeedProcessorBuilder(
string processorName,
ContainerCore container,
ContainerInternal container,
ChangeFeedProcessor changeFeedProcessor,
Action<DocumentServiceLeaseStoreManager,
ContainerCore,
ContainerInternal,
string,
string,
ChangeFeedLeaseOptions,
ChangeFeedProcessorOptions,
ContainerCore> applyBuilderConfiguration)
ContainerInternal> applyBuilderConfiguration)
{
this.changeFeedLeaseOptions = new ChangeFeedLeaseOptions();
this.changeFeedLeaseOptions.LeasePrefix = processorName;
Expand Down Expand Up @@ -162,7 +162,7 @@ public ChangeFeedProcessorBuilder WithLeaseContainer(Container leaseContainer)
if (this.leaseContainer != null) throw new InvalidOperationException("The builder already defined a lease container.");
if (this.LeaseStoreManager != null) throw new InvalidOperationException("The builder already defined an in-memory lease container instance.");

this.leaseContainer = (ContainerInlineCore)leaseContainer;
this.leaseContainer = (ContainerInternal)leaseContainer;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
internal sealed class ChangeFeedProcessorCore<T> : ChangeFeedProcessor
{
private readonly ChangeFeedObserverFactory<T> observerFactory;
private ContainerCore leaseContainer;
private ContainerInternal leaseContainer;
private string monitoredContainerRid;
private string instanceName;
private ContainerCore monitoredContainer;
private ContainerInternal monitoredContainer;
private PartitionManager partitionManager;
private ChangeFeedLeaseOptions changeFeedLeaseOptions;
private ChangeFeedProcessorOptions changeFeedProcessorOptions;
Expand All @@ -37,12 +37,12 @@ public ChangeFeedProcessorCore(ChangeFeedObserverFactory<T> observerFactory)

public void ApplyBuildConfiguration(
DocumentServiceLeaseStoreManager customDocumentServiceLeaseStoreManager,
ContainerCore leaseContainer,
ContainerInternal leaseContainer,
string monitoredContainerRid,
string instanceName,
ChangeFeedLeaseOptions changeFeedLeaseOptions,
ChangeFeedProcessorOptions changeFeedProcessorOptions,
ContainerCore monitoredContainer)
ContainerInternal monitoredContainer)
{
if (monitoredContainer == null) throw new ArgumentNullException(nameof(monitoredContainer));
if (customDocumentServiceLeaseStoreManager == null && leaseContainer == null) throw new ArgumentNullException(nameof(leaseContainer));
Expand Down Expand Up @@ -87,7 +87,7 @@ private async Task InitializeAsync()

internal static async Task<DocumentServiceLeaseStoreManager> InitializeLeaseStoreManagerAsync(
DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager,
ContainerCore leaseContainer,
ContainerInternal leaseContainer,
string leaseContainerPrefix,
string instanceName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing

internal class FeedProcessorFactoryCore<T> : FeedProcessorFactory<T>
{
private readonly ContainerCore container;
private readonly ContainerInternal container;
private readonly ChangeFeedProcessorOptions changeFeedProcessorOptions;
private readonly DocumentServiceLeaseCheckpointer leaseCheckpointer;
private readonly CosmosSerializerCore serializerCore;

public FeedProcessorFactoryCore(
ContainerCore container,
ContainerInternal container,
ChangeFeedProcessorOptions changeFeedProcessorOptions,
DocumentServiceLeaseCheckpointer leaseCheckpointer,
CosmosSerializerCore serializerCore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ public static async Task<string> GetMonitoredContainerRidAsync(
return suggestedMonitoredRid;
}

string containerRid = await ((ContainerCore)monitoredContainer).GetRIDAsync(cancellationToken);
string databaseRid = await ((DatabaseCore)((ContainerCore)monitoredContainer).Database).GetRIDAsync(cancellationToken);
string containerRid = await ((ContainerInternal)monitoredContainer).GetRIDAsync(cancellationToken);
string databaseRid = await ((DatabaseInternal)((ContainerInternal)monitoredContainer).Database).GetRIDAsync(cancellationToken);
return $"{databaseRid}_{containerRid}";
}

Expand All @@ -118,7 +118,7 @@ public static string GetLeasePrefix(
CultureInfo.InvariantCulture,
"{0}{1}_{2}",
optionsPrefix,
((ContainerCore)monitoredContainer).ClientContext.Client.Endpoint.Host,
((ContainerInternal)monitoredContainer).ClientContext.Client.Endpoint.Host,
monitoredContainerRid);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public static ChangeFeedPartitionKeyResultSetIteratorCore BuildResultSetIterator
string partitionKeyRangeId,
string continuationToken,
int? maxItemCount,
ContainerCore container,
ContainerInternal container,
DateTime? startTime,
bool startFromBeginning)
{
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public virtual Task<AccountProperties> ReadAccountAsync()
/// <returns>Cosmos database proxy</returns>
public virtual Database GetDatabase(string id)
{
return new DatabaseInlineCore(new DatabaseCore(this.ClientContext, id));
return new DatabaseInlineCore(this.ClientContext, id);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public override Documents.ShouldRetryResult HandleChangeFeedNotModified(Response
}

public override async Task<Documents.ShouldRetryResult> HandleSplitAsync(
ContainerCore containerCore,
ContainerInternal containerCore,
ResponseMessage responseMessage,
CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static bool TryParse(
public abstract Documents.ShouldRetryResult HandleChangeFeedNotModified(ResponseMessage responseMessage);

public abstract Task<Documents.ShouldRetryResult> HandleSplitAsync(
ContainerCore containerCore,
ContainerInternal containerCore,
ResponseMessage responseMessage,
CancellationToken cancellationToken);
}
Expand Down
10 changes: 5 additions & 5 deletions Microsoft.Azure.Cosmos/src/FeedRangeIteratorCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ internal sealed class FeedRangeIteratorCore : FeedIteratorInternal
{
internal readonly FeedRangeInternal FeedRangeInternal;
internal FeedRangeContinuation FeedRangeContinuation { get; private set; }
private readonly ContainerCore containerCore;
private readonly ContainerInternal containerCore;
private readonly CosmosClientContext clientContext;
private readonly QueryRequestOptions queryRequestOptions;
private readonly AsyncLazy<TryCatch<string>> lazyContainerRid;
private bool hasMoreResultsInternal;

public static FeedRangeIteratorCore Create(
ContainerCore containerCore,
ContainerInternal containerCore,
FeedRangeInternal feedRangeInternal,
string continuation,
QueryRequestOptions options)
Expand Down Expand Up @@ -65,7 +65,7 @@ public static FeedRangeIteratorCore Create(
/// For unit tests
/// </summary>
internal FeedRangeIteratorCore(
ContainerCore containerCore,
ContainerInternal containerCore,
FeedRangeContinuation feedRangeContinuation,
QueryRequestOptions options)
: this(containerCore, feedRangeContinuation.FeedRange, options)
Expand All @@ -74,7 +74,7 @@ internal FeedRangeIteratorCore(
}

private FeedRangeIteratorCore(
ContainerCore containerCore,
ContainerInternal containerCore,
FeedRangeInternal feedRangeInternal,
QueryRequestOptions options)
: this(containerCore, options)
Expand All @@ -83,7 +83,7 @@ private FeedRangeIteratorCore(
}

private FeedRangeIteratorCore(
ContainerCore containerCore,
ContainerInternal containerCore,
QueryRequestOptions options)
{
this.containerCore = containerCore ?? throw new ArgumentNullException(nameof(containerCore));
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public virtual async Task<T> SendAsync<T>(
ResourceType resourceType,
OperationType operationType,
RequestOptions requestOptions,
ContainerCore cosmosContainerCore,
ContainerInternal cosmosContainerCore,
Cosmos.PartitionKey? partitionKey,
Stream streamPayload,
Action<RequestMessage> requestEnricher,
Expand Down Expand Up @@ -98,7 +98,7 @@ public virtual async Task<ResponseMessage> SendAsync(
ResourceType resourceType,
OperationType operationType,
RequestOptions requestOptions,
ContainerCore cosmosContainerCore,
ContainerInternal cosmosContainerCore,
Cosmos.PartitionKey? partitionKey,
Stream streamPayload,
Action<RequestMessage> requestEnricher,
Expand Down
Loading