Skip to content

Commit

Permalink
[Internal] Conflicts: Fix conflicts should use the FeedRangeIterator (#…
Browse files Browse the repository at this point in the history
…1582)

* Wire conflicts

* Tests

* Emulator test
  • Loading branch information
ealsur authored and kirankumarkolli committed Jul 11, 2020
1 parent f5365dd commit 37e6e9d
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 30 deletions.
14 changes: 6 additions & 8 deletions Microsoft.Azure.Cosmos/src/Resource/Conflict/ConflictsCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,12 @@ public override FeedIterator GetConflictQueryStreamIterator(
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return new FeedIteratorCore(
clientContext: this.clientContext,
this.container.LinkUri,
resourceType: ResourceType.Conflict,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
options: requestOptions);

return FeedRangeIteratorCore.Create(
containerCore: this.container,
feedRangeInternal: null,
continuation: continuationToken,
options: requestOptions,
resourceType: ResourceType.Conflict);
}

public override FeedIterator<T> GetConflictQueryIterator<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -15,6 +16,7 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Documents;
using static Microsoft.Azure.Documents.RuntimeConstants;

/// <summary>
/// Cosmos feed stream iterator. This is used to get the query responses with a Stream content
Expand All @@ -27,19 +29,23 @@ internal sealed class FeedRangeIteratorCore : FeedIteratorInternal
private readonly CosmosClientContext clientContext;
private readonly QueryRequestOptions queryRequestOptions;
private readonly AsyncLazy<TryCatch<string>> lazyContainerRid;
private readonly ResourceType resourceType;
private readonly SqlQuerySpec querySpec;
private bool hasMoreResultsInternal;

public static FeedRangeIteratorCore Create(
ContainerInternal containerCore,
FeedRangeInternal feedRangeInternal,
string continuation,
QueryRequestOptions options)
QueryRequestOptions options,
ResourceType resourceType = ResourceType.Document,
QueryDefinition queryDefinition = null)
{
if (!string.IsNullOrEmpty(continuation))
{
if (FeedRangeContinuation.TryParse(continuation, out FeedRangeContinuation feedRangeContinuation))
{
return new FeedRangeIteratorCore(containerCore, feedRangeContinuation, options);
return new FeedRangeIteratorCore(containerCore, feedRangeContinuation, options, resourceType, queryDefinition);
}

// Backward compatible with old format
Expand All @@ -56,11 +62,11 @@ public static FeedRangeIteratorCore Create(
isMaxInclusive: false)
},
continuation);
return new FeedRangeIteratorCore(containerCore, feedRangeContinuation, options);
return new FeedRangeIteratorCore(containerCore, feedRangeContinuation, options, resourceType, queryDefinition);
}

feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForFullRange();
return new FeedRangeIteratorCore(containerCore, feedRangeInternal, options);
return new FeedRangeIteratorCore(containerCore, feedRangeInternal, options, resourceType, queryDefinition);
}

/// <summary>
Expand All @@ -69,29 +75,37 @@ public static FeedRangeIteratorCore Create(
internal FeedRangeIteratorCore(
ContainerInternal containerCore,
FeedRangeContinuation feedRangeContinuation,
QueryRequestOptions options)
: this(containerCore, feedRangeContinuation.FeedRange, options)
QueryRequestOptions options,
ResourceType resourceType,
QueryDefinition queryDefinition)
: this(containerCore, feedRangeContinuation.FeedRange, options, resourceType, queryDefinition)
{
this.FeedRangeContinuation = feedRangeContinuation;
}

private FeedRangeIteratorCore(
ContainerInternal containerCore,
FeedRangeInternal feedRangeInternal,
QueryRequestOptions options)
: this(containerCore, options)
QueryRequestOptions options,
ResourceType resourceType,
QueryDefinition queryDefinition)
: this(containerCore, options, resourceType, queryDefinition)
{
this.FeedRangeInternal = feedRangeInternal ?? throw new ArgumentNullException(nameof(feedRangeInternal));
}

private FeedRangeIteratorCore(
ContainerInternal containerCore,
QueryRequestOptions options)
QueryRequestOptions options,
ResourceType resourceType,
QueryDefinition queryDefinition)
{
this.containerCore = containerCore ?? throw new ArgumentNullException(nameof(containerCore));
this.clientContext = containerCore.ClientContext;
this.queryRequestOptions = options;
this.hasMoreResultsInternal = true;
this.resourceType = resourceType;
this.querySpec = queryDefinition?.ToSqlQuerySpec();
this.lazyContainerRid = new AsyncLazy<TryCatch<string>>(valueFactory: (innerCancellationToken) =>
{
return this.TryInitializeContainerRIdAsync(innerCancellationToken);
Expand Down Expand Up @@ -151,20 +165,32 @@ private async Task<ResponseMessage> ReadNextInternalAsync(
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
Stream stream = null;
OperationType operation = OperationType.ReadFeed;
if (this.querySpec != null)
{
stream = this.clientContext.SerializerCore.ToStreamSqlQuerySpec(this.querySpec, this.resourceType);
operation = OperationType.Query;
}

ResponseMessage responseMessage = await this.clientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.containerCore.LinkUri,
resourceType: ResourceType.Document,
operationType: OperationType.ReadFeed,
resourceType: this.resourceType,
operationType: operation,
requestOptions: this.queryRequestOptions,
cosmosContainerCore: this.containerCore,
partitionKey: this.queryRequestOptions?.PartitionKey,
streamPayload: null,
streamPayload: stream,
requestEnricher: request =>
{
FeedRangeVisitor feedRangeVisitor = new FeedRangeVisitor(request);
this.FeedRangeInternal.Accept(feedRangeVisitor);
this.FeedRangeContinuation.Accept(feedRangeVisitor, QueryRequestOptions.FillContinuationToken);
if (this.querySpec != null)
{
request.Headers.Add(HttpConstants.HttpHeaders.ContentType, MediaTypes.QueryJson);
request.Headers.Add(HttpConstants.HttpHeaders.IsQuery, bool.TrueString);
}
},
diagnosticsContext: diagnostics,
cancellationToken: cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,25 @@ public async Task PermissionTests(bool directMode)
}
}

[TestMethod]
[DataRow(false)]
[DataRow(true)]
public async Task ConclictsTests(bool directMode)
{
CosmosClient client = directMode ? DirectCosmosClient : GatewayCosmosClient;
Database database = client.GetDatabase(DatabaseId);
Container container = await database.CreateContainerAsync(Guid.NewGuid().ToString(), "/id");
//Read All
List<ConflictProperties> results = await this.ToListAsync(
container.Conflicts.GetConflictQueryStreamIterator,
container.Conflicts.GetConflictQueryIterator<ConflictProperties>,
null,
CosmosBasicQueryTests.RequestOptions
);

// There is no way to simulate MM conflicts on the emulator but the list operations should work
}

private delegate FeedIterator<T> Query<T>(string querytext, string continuationToken, QueryRequestOptions options);
private delegate FeedIterator QueryStream(string querytext, string continuationToken, QueryRequestOptions options);

Expand Down
Loading

0 comments on commit 37e6e9d

Please sign in to comment.