Skip to content

Commit

Permalink
Adding TransactionalBatchRequestOption and test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Rakesh Kumar <rakkuma@microsoft.com>
  • Loading branch information
rakkuma committed Jul 7, 2020
1 parent 0aa0ba8 commit ebc75f5
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>An awaitable <see cref="TransactionalBatchResponse"/> which contains the completion status and results of each operation.</returns>
public override Task<TransactionalBatchResponse> ExecuteAsync(
ItemRequestOptions requestOptions,
TransactionalBatchRequestOption requestOptions,
CancellationToken cancellationToken = default(CancellationToken))
{
return this.container.ClientContext.OperationHelperAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ internal PartitionKeyRangeBatchResponse(
}

itemBatchOperations.AddRange(serverResponse.Operations);
this.RequestCharge += serverResponse.RequestCharge;
this.Headers = serverResponse.Headers;

if (!string.IsNullOrEmpty(serverResponse.ErrorMessage))
{
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public abstract Task<TransactionalBatchResponse> ExecuteAsync(
/// <summary>
/// Executes the transactional batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="requestOptions">Options that apply to the batch.</param>
/// <param name="requestOptions">Options that apply specifically to batch request.</param>
/// <param name="cancellationToken">(Optional) Cancellation token representing request cancellation.</param>
/// <returns>An awaitable response which contains details of execution of the transactional batch.
/// <para>
Expand All @@ -253,7 +253,7 @@ public abstract Task<TransactionalBatchResponse> ExecuteAsync(
/// Use <see cref="TransactionalBatchResponse.IsSuccessStatusCode"/> on the response returned to ensure that the transactional batch succeeded.
/// </remarks>
public abstract Task<TransactionalBatchResponse> ExecuteAsync(
ItemRequestOptions requestOptions,
TransactionalBatchRequestOption requestOptions,
CancellationToken cancellationToken = default(CancellationToken));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using Microsoft.Azure.Documents;

/// <summary>
/// Cosmos batch request options
/// </summary>
public class TransactionalBatchRequestOption : RequestOptions
{
/// <summary>
/// Gets or sets the consistency level required for the request in the Azure Cosmos DB service.
/// </summary>
/// <value>
/// The consistency level required for the request.
/// </value>
/// <remarks>
/// Azure Cosmos DB offers 5 different consistency levels. Strong, Bounded Staleness, Session, Consistent Prefix and Eventual - in order of strongest to weakest consistency. <see cref="ConnectionPolicy"/>
/// <para>
/// While this is set at a database account level, Azure Cosmos DB allows a developer to override the default consistency level
/// for each individual request.
/// </para>
/// </remarks>
public ConsistencyLevel? ConsistencyLevel
{
get => this.BaseConsistencyLevel;
set => this.BaseConsistencyLevel = value;
}

/// <summary>
/// Gets or sets the boolean to only return the headers and status code in
/// the Cosmos DB response for operations in the transactional batch request.
/// This removes the resource from the response. This reduces networking and CPU load by not sending
/// the resource back over the network and serializing it on the client.
/// </summary>
/// <remarks>
/// This is optimal for workloads where the returned resource is not used.
/// </remarks>
public bool? EnableContentResponseOnOperations { get; set; }

/// <summary>
/// Fill the CosmosRequestMessage headers with the set properties
/// </summary>
/// <param name="request">The <see cref="RequestMessage"/></param>
internal override void PopulateRequestOptions(RequestMessage request)
{
if (this.EnableContentResponseOnOperations.HasValue &&
!this.EnableContentResponseOnOperations.Value)
{
request.Headers.Add(HttpConstants.HttpHeaders.Prefer, HttpConstants.HttpHeaderValues.PreferReturnMinimal);
}

base.PopulateRequestOptions(request);
}
}
}
39 changes: 15 additions & 24 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ internal TransactionalBatchResponse(
: this(statusCode,
subStatusCode,
errorMessage,
requestCharge: 0,
retryAfter: null,
activityId: Guid.Empty.ToString(),
new Headers(),
diagnosticsContext: diagnosticsContext,
operations: operations,
serializer: null)
Expand All @@ -65,9 +63,7 @@ private TransactionalBatchResponse(
HttpStatusCode statusCode,
SubStatusCodes subStatusCode,
string errorMessage,
double requestCharge,
TimeSpan? retryAfter,
string activityId,
Headers headers,
CosmosDiagnosticsContext diagnosticsContext,
IReadOnlyList<ItemBatchOperation> operations,
CosmosSerializerCore serializer)
Expand All @@ -77,30 +73,33 @@ private TransactionalBatchResponse(
this.ErrorMessage = errorMessage;
this.Operations = operations;
this.SerializerCore = serializer;
this.RequestCharge = requestCharge;
this.RetryAfter = retryAfter;
this.ActivityId = activityId;
this.Headers = headers;
this.Diagnostics = diagnosticsContext.Diagnostics;
this.DiagnosticsContext = diagnosticsContext ?? throw new ArgumentNullException(nameof(diagnosticsContext));
}

/// <summary>
/// Gets the current <see cref="ResponseMessage"/> HTTP headers.
/// </summary>
public virtual Headers Headers { get; set; }

/// <summary>
/// Gets the ActivityId that identifies the server request made to execute the batch.
/// </summary>
public virtual string ActivityId { get; }
public virtual string ActivityId => this.Headers?.ActivityId;

/// <summary>
/// Gets the request charge for the batch request.
/// </summary>
/// <value>
/// The request charge measured in request units.
/// </value>
public virtual double RequestCharge { get; internal set; }
public virtual double RequestCharge => this.Headers?.RequestCharge ?? 0;

/// <summary>
/// Gets the amount of time to wait before retrying this or any other request within Cosmos container or collection due to throttling.
/// </summary>
public virtual TimeSpan? RetryAfter { get; }
public virtual TimeSpan? RetryAfter => this.Headers?.RetryAfter;

/// <summary>
/// Gets the completion status code of the batch request.
Expand Down Expand Up @@ -252,9 +251,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
HttpStatusCode.InternalServerError,
SubStatusCodes.Unknown,
ClientResources.ServerResponseDeserializationFailure,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand All @@ -268,9 +265,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
responseMessage.StatusCode,
responseMessage.Headers.SubStatusCode,
responseMessage.ErrorMessage,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand All @@ -286,9 +281,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
HttpStatusCode.InternalServerError,
SubStatusCodes.Unknown,
ClientResources.InvalidServerResponse,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand Down Expand Up @@ -382,9 +375,7 @@ record =>
responseStatusCode,
responseSubStatusCode,
responseMessage.ErrorMessage,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,38 @@ public async Task BatchItemETagAsync()
}
}

[TestMethod]
[Owner("rakkuma")]
[Description("Verify session token received from batch operations")]
public async Task BatchItemSessionTokenAsync()
{
Container container = BatchTestBase.JsonContainer;
await this.CreateJsonTestDocsAsync(container);

TestDoc testDocToCreate = BatchTestBase.PopulateTestDoc(this.PartitionKey1);

TestDoc testDocToReplace = this.GetTestDocCopy(this.TestDocPk1ExistingA);
testDocToReplace.Cost++;

ItemResponse<TestDoc> readResponse = await BatchTestBase.JsonContainer.ReadItemAsync<TestDoc>(
this.TestDocPk1ExistingA.Id,
BatchTestBase.GetPartitionKey(this.PartitionKey1));

ISessionToken beforeRequestSessionToken = BatchTestBase.GetSessionToken(readResponse.Headers.Session);

TransactionalBatchResponse batchResponse = await new BatchCore((ContainerInlineCore)container, BatchTestBase.GetPartitionKey(this.PartitionKey1))
.CreateItem(testDocToCreate)
.ReplaceItem(testDocToReplace.Id, testDocToReplace)
.ExecuteAsync();

BatchSinglePartitionKeyTests.VerifyBatchProcessed(batchResponse, numberOfOperations: 2);
Assert.AreEqual(HttpStatusCode.Created, batchResponse[0].StatusCode);
Assert.AreEqual(HttpStatusCode.OK, batchResponse[1].StatusCode);

ISessionToken afterRequestSessionToken = BatchTestBase.GetSessionToken(batchResponse.Headers.Session);
Assert.IsTrue(afterRequestSessionToken.LSN > beforeRequestSessionToken.LSN, "Response session token should be more than request session token");
}

[TestMethod]
[Owner("abpai")]
[Description("Verify TTL passed to binary passthrough batch operations flow as expected")]
Expand Down Expand Up @@ -320,7 +352,7 @@ public async Task BatchReadsOnlyAsync()

private async Task<TransactionalBatchResponse> RunCrudAsync(bool isStream, bool isSchematized, bool useEpk, Container container)
{
ItemRequestOptions batchOptions = null;
TransactionalBatchRequestOption batchOptions = null;
if (isSchematized)
{
await this.CreateSchematizedTestDocsAsync(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ protected static async Task VerifyNotFoundAsync(Container container, TestDoc doc
Assert.AreEqual(HttpStatusCode.NotFound, response.StatusCode);
}

protected static ItemRequestOptions GetUpdatedBatchRequestOptions(
ItemRequestOptions batchOptions = null,
protected static TransactionalBatchRequestOption GetUpdatedBatchRequestOptions(
TransactionalBatchRequestOption batchOptions = null,
bool isSchematized = false,
bool useEpk = false,
object partitionKey = null)
Expand All @@ -302,7 +302,7 @@ protected static ItemRequestOptions GetUpdatedBatchRequestOptions(
{
if (batchOptions == null)
{
batchOptions = new ItemRequestOptions();
batchOptions = new TransactionalBatchRequestOption();
}

Dictionary<string, object> properties = new Dictionary<string, object>()
Expand Down Expand Up @@ -412,6 +412,12 @@ protected static byte[] HexStringToBytes(string input)
return bytes;
}

internal static ISessionToken GetSessionToken(string sessionToken)
{
string[] tokenParts = sessionToken.Split(':');
return SessionTokenHelper.Parse(tokenParts[1]);
}

private static bool PopulateRequestOptions(RequestOptions requestOptions, TestDoc doc, bool isSchematized, bool useEpk, int? ttlInSeconds)
{
Dictionary<string, object> properties = new Dictionary<string, object>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,52 @@ public async Task ItemBatchNoResponseTest()
this.ValidateResponse(response, noResponseItemCount);
}

[TestMethod]
public async Task TransactionalBatchNoResponseTest()
{
TransactionalBatchRequestOption requestOptions = new TransactionalBatchRequestOption()
{
EnableContentResponseOnOperations = false
};

string pkId = "TestTransactionalBatchId";
TransactionalBatch batch = this.container.CreateTransactionalBatch(new PartitionKey(pkId));

int noResponseItemCount = 100;
for (int i = 0; i < noResponseItemCount; i++)
{
ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkId);
batch.CreateItem<ToDoActivity>(item);
}

TransactionalBatchResponse response = await batch.ExecuteAsync(requestOptions);
Assert.AreEqual(100, response.Count);
this.ValidateResponse(response, noResponseItemCount);

pkId = "TestTransactionalBatchId2";
batch = this.container.CreateTransactionalBatch(new PartitionKey(pkId));

noResponseItemCount = 0;
for (int i = 0; i < 10; i++)
{
ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkId);
batch.CreateItem<ToDoActivity>(item);
noResponseItemCount++;
ToDoActivity item2 = ToDoActivity.CreateRandomToDoActivity(pk: pkId);
item2.id = item.id;
batch.ReplaceItem<ToDoActivity>(item2.id, item2);
noResponseItemCount++;

// Even Read won't return response
batch.ReadItem(item2.id);
noResponseItemCount++;
}

response = await batch.ExecuteAsync(requestOptions);
Assert.AreEqual(noResponseItemCount, response.Count);
this.ValidateResponse(response, noResponseItemCount);
}

[TestMethod]
public async Task ItemBulkNoResponseTest()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class CosmosDiagnosticsTests : BaseCosmosClientHelper
private Container Container = null;
private ContainerProperties containerSettings = null;

private static readonly ItemRequestOptions RequestOptionDisableDiagnostic = new ItemRequestOptions()
private static readonly TransactionalBatchRequestOption RequestOptionDisableDiagnostic = new TransactionalBatchRequestOption()
{
DiagnosticContextFactory = () => EmptyCosmosDiagnosticsContext.Singleton
};
Expand Down Expand Up @@ -398,7 +398,7 @@ public async Task BatchOperationDiagnostic(bool disableDiagnostics)
batch.ReadItem(createItems[i].id);
}

ItemRequestOptions requestOptions = disableDiagnostics ? RequestOptionDisableDiagnostic : null;
TransactionalBatchRequestOption requestOptions = disableDiagnostics ? RequestOptionDisableDiagnostic : null;
TransactionalBatchResponse response = await batch.ExecuteAsync(requestOptions);

Assert.IsNotNull(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ public class BatchUnitTests
public async Task BatchInvalidOptionsAsync()
{
Container container = BatchUnitTests.GetContainer();
List<ItemRequestOptions> badBatchOptionsList = new List<ItemRequestOptions>()
List<TransactionalBatchRequestOption> badBatchOptionsList = new List<TransactionalBatchRequestOption>()
{
new ItemRequestOptions()
new TransactionalBatchRequestOption()
{
IfMatchEtag = "cond",
},
new ItemRequestOptions()
new TransactionalBatchRequestOption()
{
IfNoneMatchEtag = "cond2",
}
};

foreach (ItemRequestOptions batchOptions in badBatchOptionsList)
foreach (TransactionalBatchRequestOption batchOptions in badBatchOptionsList)
{
BatchCore batch = (BatchCore)
new BatchCore((ContainerInternal)container, new Cosmos.PartitionKey(BatchUnitTests.PartitionKey1))
Expand Down Expand Up @@ -476,8 +476,8 @@ private static void VerifyBatchItemRequestOptionsAreEqual(TransactionalBatchItem
private static async Task VerifyExceptionThrownOnExecuteAsync(
TransactionalBatch batch,
Type expectedTypeOfException,
string expectedExceptionMessage = null,
ItemRequestOptions requestOptions = null)
string expectedExceptionMessage = null,
TransactionalBatchRequestOption requestOptions = null)
{
bool wasExceptionThrown = false;
try
Expand Down
Loading

0 comments on commit ebc75f5

Please sign in to comment.