Skip to content

Commit

Permalink
Batch API: Adds support of request options for transactional batch (#…
Browse files Browse the repository at this point in the history
…1569)

* Extending support of item request options for transactional batch

* Adding TransactionalBatchRequestOption and test cases
  • Loading branch information
rakkuma authored Jul 14, 2020
1 parent e22f959 commit 9378a32
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 54 deletions.
6 changes: 3 additions & 3 deletions Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
/// <summary>
/// Executes the batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="requestOptions">Options that apply to the batch. Used only for EPK routing.</param>
/// <param name="requestOptions">Options that apply to the batch.</param>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>An awaitable <see cref="TransactionalBatchResponse"/> which contains the completion status and results of each operation.</returns>
public virtual Task<TransactionalBatchResponse> ExecuteAsync(
RequestOptions requestOptions,
public override Task<TransactionalBatchResponse> ExecuteAsync(
TransactionalBatchRequestOptions requestOptions,
CancellationToken cancellationToken = default(CancellationToken))
{
return this.container.ClientContext.OperationHelperAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ internal PartitionKeyRangeBatchResponse(
}

itemBatchOperations.AddRange(serverResponse.Operations);
this.RequestCharge += serverResponse.RequestCharge;
this.Headers = serverResponse.Headers;

if (!string.IsNullOrEmpty(serverResponse.ErrorMessage))
{
Expand Down
32 changes: 32 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,5 +223,37 @@ public abstract TransactionalBatch DeleteItem(
/// </remarks>
public abstract Task<TransactionalBatchResponse> ExecuteAsync(
CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Executes the transactional batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="requestOptions">Options that apply specifically to batch request.</param>
/// <param name="cancellationToken">(Optional) Cancellation token representing request cancellation.</param>
/// <returns>An awaitable response which contains details of execution of the transactional batch.
/// <para>
/// If the transactional batch executes successfully, the <see cref="TransactionalBatchResponse.StatusCode"/> on the response returned
/// will be set to <see cref="HttpStatusCode.OK"/>.
/// </para>
/// <para>
/// If an operation within the transactional batch fails during execution, no changes from the batch will be committed
/// and the status of the failing operation is made available in the <see cref="TransactionalBatchResponse.StatusCode"/>.
/// To get more details about the operation that failed, the response can be enumerated - this returns <see cref="TransactionalBatchOperationResult" />
/// instances corresponding to each operation in the transactional batch in the order they were added into the transactional batch.
/// For a result corresponding to an operation within the transactional batch, the <see cref="TransactionalBatchOperationResult.StatusCode"/> indicates
/// the status of the operation - if the operation was not executed or it was aborted due to the failure of another operation within the transactional batch,
/// the value of this field will be HTTP 424 (Failed Dependency); for the operation that caused the batch to abort, the value of this field will indicate
/// the cause of failure as a HTTP status code.
/// </para>
/// <para>
/// The <see cref="TransactionalBatchResponse.StatusCode"/> on the response returned may also have values such as HTTP 5xx in case of server errors and HTTP 429 (Too Many Requests).
/// </para>
/// </returns>
/// <remarks>
/// This API only throws on client side exceptions. This is to increase performance and prevent the overhead of throwing exceptions.
/// Use <see cref="TransactionalBatchResponse.IsSuccessStatusCode"/> on the response returned to ensure that the transactional batch succeeded.
/// </remarks>
public abstract Task<TransactionalBatchResponse> ExecuteAsync(
TransactionalBatchRequestOptions requestOptions,
CancellationToken cancellationToken = default(CancellationToken));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
/// <summary>
/// Cosmos batch request options.
/// </summary>
public class TransactionalBatchRequestOptions : RequestOptions
{
/// <summary>
/// Gets or sets the consistency level required for the request in the Azure Cosmos DB service.
/// </summary>
/// <value>
/// The consistency level required for the request.
/// </value>
/// <remarks>
/// Azure Cosmos DB offers 5 different consistency levels. Strong, Bounded Staleness, Session, Consistent Prefix and Eventual - in order of strongest to weakest consistency. <see cref="ConnectionPolicy"/>
/// <para>
/// While this is set at a database account level, Azure Cosmos DB allows a developer to override the default consistency level
/// for each individual request.
/// </para>
/// </remarks>
public ConsistencyLevel? ConsistencyLevel
{
get => this.BaseConsistencyLevel;
set => this.BaseConsistencyLevel = value;
}

/// <summary>
/// Fill the CosmosRequestMessage headers with the set properties
/// </summary>
/// <param name="request">The <see cref="RequestMessage"/></param>
internal override void PopulateRequestOptions(RequestMessage request)
{
base.PopulateRequestOptions(request);
}
}
}
39 changes: 15 additions & 24 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ internal TransactionalBatchResponse(
: this(statusCode,
subStatusCode,
errorMessage,
requestCharge: 0,
retryAfter: null,
activityId: Guid.Empty.ToString(),
new Headers(),
diagnosticsContext: diagnosticsContext,
operations: operations,
serializer: null)
Expand All @@ -65,9 +63,7 @@ private TransactionalBatchResponse(
HttpStatusCode statusCode,
SubStatusCodes subStatusCode,
string errorMessage,
double requestCharge,
TimeSpan? retryAfter,
string activityId,
Headers headers,
CosmosDiagnosticsContext diagnosticsContext,
IReadOnlyList<ItemBatchOperation> operations,
CosmosSerializerCore serializer)
Expand All @@ -77,30 +73,33 @@ private TransactionalBatchResponse(
this.ErrorMessage = errorMessage;
this.Operations = operations;
this.SerializerCore = serializer;
this.RequestCharge = requestCharge;
this.RetryAfter = retryAfter;
this.ActivityId = activityId;
this.Headers = headers;
this.Diagnostics = diagnosticsContext.Diagnostics;
this.DiagnosticsContext = diagnosticsContext ?? throw new ArgumentNullException(nameof(diagnosticsContext));
}

/// <summary>
/// Gets the current HTTP headers.
/// </summary>
public virtual Headers Headers { get; internal set; }

/// <summary>
/// Gets the ActivityId that identifies the server request made to execute the batch.
/// </summary>
public virtual string ActivityId { get; }
public virtual string ActivityId => this.Headers?.ActivityId;

/// <summary>
/// Gets the request charge for the batch request.
/// </summary>
/// <value>
/// The request charge measured in request units.
/// </value>
public virtual double RequestCharge { get; internal set; }
public virtual double RequestCharge => this.Headers?.RequestCharge ?? 0;

/// <summary>
/// Gets the amount of time to wait before retrying this or any other request within Cosmos container or collection due to throttling.
/// </summary>
public virtual TimeSpan? RetryAfter { get; }
public virtual TimeSpan? RetryAfter => this.Headers?.RetryAfter;

/// <summary>
/// Gets the completion status code of the batch request.
Expand Down Expand Up @@ -252,9 +251,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
HttpStatusCode.InternalServerError,
SubStatusCodes.Unknown,
ClientResources.ServerResponseDeserializationFailure,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand All @@ -268,9 +265,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
responseMessage.StatusCode,
responseMessage.Headers.SubStatusCode,
responseMessage.ErrorMessage,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand All @@ -286,9 +281,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
HttpStatusCode.InternalServerError,
SubStatusCodes.Unknown,
ClientResources.InvalidServerResponse,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand Down Expand Up @@ -382,9 +375,7 @@ record =>
responseStatusCode,
responseSubStatusCode,
responseMessage.ErrorMessage,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,38 @@ public async Task BatchItemETagAsync()
}
}

[TestMethod]
[Owner("rakkuma")]
[Description("Verify session token received from batch operations")]
public async Task BatchItemSessionTokenAsync()
{
Container container = BatchTestBase.JsonContainer;
await this.CreateJsonTestDocsAsync(container);

TestDoc testDocToCreate = BatchTestBase.PopulateTestDoc(this.PartitionKey1);

TestDoc testDocToReplace = this.GetTestDocCopy(this.TestDocPk1ExistingA);
testDocToReplace.Cost++;

ItemResponse<TestDoc> readResponse = await BatchTestBase.JsonContainer.ReadItemAsync<TestDoc>(
this.TestDocPk1ExistingA.Id,
BatchTestBase.GetPartitionKey(this.PartitionKey1));

ISessionToken beforeRequestSessionToken = BatchTestBase.GetSessionToken(readResponse.Headers.Session);

TransactionalBatchResponse batchResponse = await new BatchCore((ContainerInlineCore)container, BatchTestBase.GetPartitionKey(this.PartitionKey1))
.CreateItem(testDocToCreate)
.ReplaceItem(testDocToReplace.Id, testDocToReplace)
.ExecuteAsync();

BatchSinglePartitionKeyTests.VerifyBatchProcessed(batchResponse, numberOfOperations: 2);
Assert.AreEqual(HttpStatusCode.Created, batchResponse[0].StatusCode);
Assert.AreEqual(HttpStatusCode.OK, batchResponse[1].StatusCode);

ISessionToken afterRequestSessionToken = BatchTestBase.GetSessionToken(batchResponse.Headers.Session);
Assert.IsTrue(afterRequestSessionToken.LSN > beforeRequestSessionToken.LSN, "Response session token should be more than request session token");
}

[TestMethod]
[Owner("abpai")]
[Description("Verify TTL passed to binary passthrough batch operations flow as expected")]
Expand Down Expand Up @@ -320,7 +352,7 @@ public async Task BatchReadsOnlyAsync()

private async Task<TransactionalBatchResponse> RunCrudAsync(bool isStream, bool isSchematized, bool useEpk, Container container)
{
RequestOptions batchOptions = null;
TransactionalBatchRequestOptions batchOptions = null;
if (isSchematized)
{
await this.CreateSchematizedTestDocsAsync(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ protected static async Task VerifyNotFoundAsync(Container container, TestDoc doc
Assert.AreEqual(HttpStatusCode.NotFound, response.StatusCode);
}

protected static RequestOptions GetUpdatedBatchRequestOptions(
RequestOptions batchOptions = null,
protected static TransactionalBatchRequestOptions GetUpdatedBatchRequestOptions(
TransactionalBatchRequestOptions batchOptions = null,
bool isSchematized = false,
bool useEpk = false,
object partitionKey = null)
Expand All @@ -302,7 +302,7 @@ protected static RequestOptions GetUpdatedBatchRequestOptions(
{
if (batchOptions == null)
{
batchOptions = new RequestOptions();
batchOptions = new TransactionalBatchRequestOptions();
}

Dictionary<string, object> properties = new Dictionary<string, object>()
Expand Down Expand Up @@ -412,6 +412,12 @@ protected static byte[] HexStringToBytes(string input)
return bytes;
}

internal static ISessionToken GetSessionToken(string sessionToken)
{
string[] tokenParts = sessionToken.Split(':');
return SessionTokenHelper.Parse(tokenParts[1]);
}

private static bool PopulateRequestOptions(RequestOptions requestOptions, TestDoc doc, bool isSchematized, bool useEpk, int? ttlInSeconds)
{
Dictionary<string, object> properties = new Dictionary<string, object>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class CosmosDiagnosticsTests : BaseCosmosClientHelper
private Container Container = null;
private ContainerProperties containerSettings = null;

private static readonly ItemRequestOptions RequestOptionDisableDiagnostic = new ItemRequestOptions()
private static readonly TransactionalBatchRequestOptions RequestOptionDisableDiagnostic = new TransactionalBatchRequestOptions()
{
DiagnosticContextFactory = () => EmptyCosmosDiagnosticsContext.Singleton
};
Expand Down Expand Up @@ -398,8 +398,8 @@ public async Task BatchOperationDiagnostic(bool disableDiagnostics)
batch.ReadItem(createItems[i].id);
}

RequestOptions requestOptions = disableDiagnostics ? RequestOptionDisableDiagnostic : null;
TransactionalBatchResponse response = await ((BatchCore)batch).ExecuteAsync(requestOptions);
TransactionalBatchRequestOptions requestOptions = disableDiagnostics ? RequestOptionDisableDiagnostic : null;
TransactionalBatchResponse response = await batch.ExecuteAsync(requestOptions);

Assert.IsNotNull(response);
CosmosDiagnosticsTests.VerifyPointDiagnostics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ public class BatchUnitTests
public async Task BatchInvalidOptionsAsync()
{
Container container = BatchUnitTests.GetContainer();
List<RequestOptions> badBatchOptionsList = new List<RequestOptions>()
List<TransactionalBatchRequestOptions> badBatchOptionsList = new List<TransactionalBatchRequestOptions>()
{
new RequestOptions()
new TransactionalBatchRequestOptions()
{
IfMatchEtag = "cond",
},
new RequestOptions()
new TransactionalBatchRequestOptions()
{
IfNoneMatchEtag = "cond2",
}
};

foreach (RequestOptions batchOptions in badBatchOptionsList)
foreach (TransactionalBatchRequestOptions batchOptions in badBatchOptionsList)
{
BatchCore batch = (BatchCore)
new BatchCore((ContainerInternal)container, new Cosmos.PartitionKey(BatchUnitTests.PartitionKey1))
Expand Down Expand Up @@ -476,15 +476,15 @@ private static void VerifyBatchItemRequestOptionsAreEqual(TransactionalBatchItem
private static async Task VerifyExceptionThrownOnExecuteAsync(
TransactionalBatch batch,
Type expectedTypeOfException,
string expectedExceptionMessage = null,
RequestOptions requestOptions = null)
string expectedExceptionMessage = null,
TransactionalBatchRequestOptions requestOptions = null)
{
bool wasExceptionThrown = false;
try
{
if (requestOptions != null)
{
await ((BatchCore)batch).ExecuteAsync(requestOptions);
await batch.ExecuteAsync(requestOptions);
}
else
{
Expand Down
Loading

0 comments on commit 9378a32

Please sign in to comment.