Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch API: Add support of request options for transactional batch #1569

Merged
merged 10 commits into from
Jul 14, 2020
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>An awaitable <see cref="TransactionalBatchResponse"/> which contains the completion status and results of each operation.</returns>
public override Task<TransactionalBatchResponse> ExecuteAsync(
ItemRequestOptions requestOptions,
TransactionalBatchRequestOption requestOptions,
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
CancellationToken cancellationToken = default(CancellationToken))
{
return this.container.ClientContext.OperationHelperAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ internal PartitionKeyRangeBatchResponse(
}

itemBatchOperations.AddRange(serverResponse.Operations);
this.RequestCharge += serverResponse.RequestCharge;
this.Headers = serverResponse.Headers;

if (!string.IsNullOrEmpty(serverResponse.ErrorMessage))
{
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public abstract Task<TransactionalBatchResponse> ExecuteAsync(
/// <summary>
/// Executes the transactional batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="requestOptions">Options that apply to the batch.</param>
/// <param name="requestOptions">Options that apply specifically to batch request.</param>
/// <param name="cancellationToken">(Optional) Cancellation token representing request cancellation.</param>
/// <returns>An awaitable response which contains details of execution of the transactional batch.
/// <para>
Expand All @@ -253,7 +253,7 @@ public abstract Task<TransactionalBatchResponse> ExecuteAsync(
/// Use <see cref="TransactionalBatchResponse.IsSuccessStatusCode"/> on the response returned to ensure that the transactional batch succeeded.
/// </remarks>
public abstract Task<TransactionalBatchResponse> ExecuteAsync(
ItemRequestOptions requestOptions,
TransactionalBatchRequestOption requestOptions,
CancellationToken cancellationToken = default(CancellationToken));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using Microsoft.Azure.Documents;

/// <summary>
/// Cosmos batch request options
/// </summary>
public class TransactionalBatchRequestOption : RequestOptions
{
/// <summary>
/// Gets or sets the consistency level required for the request in the Azure Cosmos DB service.
/// </summary>
/// <value>
/// The consistency level required for the request.
/// </value>
/// <remarks>
/// Azure Cosmos DB offers 5 different consistency levels. Strong, Bounded Staleness, Session, Consistent Prefix and Eventual - in order of strongest to weakest consistency. <see cref="ConnectionPolicy"/>
/// <para>
/// While this is set at a database account level, Azure Cosmos DB allows a developer to override the default consistency level
/// for each individual request.
/// </para>
/// </remarks>
public ConsistencyLevel? ConsistencyLevel
{
get => this.BaseConsistencyLevel;
set => this.BaseConsistencyLevel = value;
}

/// <summary>
/// Gets or sets the boolean to only return the headers and status code in
/// the Cosmos DB response for operations in the transactional batch request.
/// This removes the resource from the response. This reduces networking and CPU load by not sending
/// the resource back over the network and serializing it on the client.
/// </summary>
/// <remarks>
/// This is optimal for workloads where the returned resource is not used.
/// </remarks>
public bool? EnableContentResponseOnOperations { get; set; }

/// <summary>
/// Fill the CosmosRequestMessage headers with the set properties
/// </summary>
/// <param name="request">The <see cref="RequestMessage"/></param>
internal override void PopulateRequestOptions(RequestMessage request)
{
if (this.EnableContentResponseOnOperations.HasValue &&
!this.EnableContentResponseOnOperations.Value)
{
request.Headers.Add(HttpConstants.HttpHeaders.Prefer, HttpConstants.HttpHeaderValues.PreferReturnMinimal);
}

base.PopulateRequestOptions(request);
}
}
}
39 changes: 15 additions & 24 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ internal TransactionalBatchResponse(
: this(statusCode,
subStatusCode,
errorMessage,
requestCharge: 0,
retryAfter: null,
activityId: Guid.Empty.ToString(),
new Headers(),
diagnosticsContext: diagnosticsContext,
operations: operations,
serializer: null)
Expand All @@ -65,9 +63,7 @@ private TransactionalBatchResponse(
HttpStatusCode statusCode,
SubStatusCodes subStatusCode,
string errorMessage,
double requestCharge,
TimeSpan? retryAfter,
string activityId,
Headers headers,
CosmosDiagnosticsContext diagnosticsContext,
IReadOnlyList<ItemBatchOperation> operations,
CosmosSerializerCore serializer)
Expand All @@ -77,30 +73,33 @@ private TransactionalBatchResponse(
this.ErrorMessage = errorMessage;
this.Operations = operations;
this.SerializerCore = serializer;
this.RequestCharge = requestCharge;
this.RetryAfter = retryAfter;
this.ActivityId = activityId;
this.Headers = headers;
this.Diagnostics = diagnosticsContext.Diagnostics;
this.DiagnosticsContext = diagnosticsContext ?? throw new ArgumentNullException(nameof(diagnosticsContext));
}

/// <summary>
/// Gets the current <see cref="ResponseMessage"/> HTTP headers.
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public virtual Headers Headers { get; set; }

/// <summary>
/// Gets the ActivityId that identifies the server request made to execute the batch.
/// </summary>
public virtual string ActivityId { get; }
public virtual string ActivityId => this.Headers?.ActivityId;

/// <summary>
/// Gets the request charge for the batch request.
/// </summary>
/// <value>
/// The request charge measured in request units.
/// </value>
public virtual double RequestCharge { get; internal set; }
public virtual double RequestCharge => this.Headers?.RequestCharge ?? 0;

/// <summary>
/// Gets the amount of time to wait before retrying this or any other request within Cosmos container or collection due to throttling.
/// </summary>
public virtual TimeSpan? RetryAfter { get; }
public virtual TimeSpan? RetryAfter => this.Headers?.RetryAfter;

/// <summary>
/// Gets the completion status code of the batch request.
Expand Down Expand Up @@ -252,9 +251,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
HttpStatusCode.InternalServerError,
SubStatusCodes.Unknown,
ClientResources.ServerResponseDeserializationFailure,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand All @@ -268,9 +265,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
responseMessage.StatusCode,
responseMessage.Headers.SubStatusCode,
responseMessage.ErrorMessage,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand All @@ -286,9 +281,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
HttpStatusCode.InternalServerError,
SubStatusCodes.Unknown,
ClientResources.InvalidServerResponse,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand Down Expand Up @@ -382,9 +375,7 @@ record =>
responseStatusCode,
responseSubStatusCode,
responseMessage.ErrorMessage,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,38 @@ public async Task BatchItemETagAsync()
}
}

[TestMethod]
[Owner("rakkuma")]
[Description("Verify session token received from batch operations")]
public async Task BatchItemSessionTokenAsync()
{
Container container = BatchTestBase.JsonContainer;
await this.CreateJsonTestDocsAsync(container);

TestDoc testDocToCreate = BatchTestBase.PopulateTestDoc(this.PartitionKey1);

TestDoc testDocToReplace = this.GetTestDocCopy(this.TestDocPk1ExistingA);
testDocToReplace.Cost++;

ItemResponse<TestDoc> readResponse = await BatchTestBase.JsonContainer.ReadItemAsync<TestDoc>(
this.TestDocPk1ExistingA.Id,
BatchTestBase.GetPartitionKey(this.PartitionKey1));

ISessionToken beforeRequestSessionToken = BatchTestBase.GetSessionToken(readResponse.Headers.Session);

TransactionalBatchResponse batchResponse = await new BatchCore((ContainerInlineCore)container, BatchTestBase.GetPartitionKey(this.PartitionKey1))
.CreateItem(testDocToCreate)
.ReplaceItem(testDocToReplace.Id, testDocToReplace)
.ExecuteAsync();

BatchSinglePartitionKeyTests.VerifyBatchProcessed(batchResponse, numberOfOperations: 2);
Assert.AreEqual(HttpStatusCode.Created, batchResponse[0].StatusCode);
Assert.AreEqual(HttpStatusCode.OK, batchResponse[1].StatusCode);

ISessionToken afterRequestSessionToken = BatchTestBase.GetSessionToken(batchResponse.Headers.Session);
Assert.IsTrue(afterRequestSessionToken.LSN > beforeRequestSessionToken.LSN, "Response session token should be more than request session token");
}

[TestMethod]
[Owner("abpai")]
[Description("Verify TTL passed to binary passthrough batch operations flow as expected")]
Expand Down Expand Up @@ -320,7 +352,7 @@ public async Task BatchReadsOnlyAsync()

private async Task<TransactionalBatchResponse> RunCrudAsync(bool isStream, bool isSchematized, bool useEpk, Container container)
{
ItemRequestOptions batchOptions = null;
TransactionalBatchRequestOption batchOptions = null;
if (isSchematized)
{
await this.CreateSchematizedTestDocsAsync(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ protected static async Task VerifyNotFoundAsync(Container container, TestDoc doc
Assert.AreEqual(HttpStatusCode.NotFound, response.StatusCode);
}

protected static ItemRequestOptions GetUpdatedBatchRequestOptions(
ItemRequestOptions batchOptions = null,
protected static TransactionalBatchRequestOption GetUpdatedBatchRequestOptions(
TransactionalBatchRequestOption batchOptions = null,
bool isSchematized = false,
bool useEpk = false,
object partitionKey = null)
Expand All @@ -302,7 +302,7 @@ protected static ItemRequestOptions GetUpdatedBatchRequestOptions(
{
if (batchOptions == null)
{
batchOptions = new ItemRequestOptions();
batchOptions = new TransactionalBatchRequestOption();
}

Dictionary<string, object> properties = new Dictionary<string, object>()
Expand Down Expand Up @@ -412,6 +412,12 @@ protected static byte[] HexStringToBytes(string input)
return bytes;
}

internal static ISessionToken GetSessionToken(string sessionToken)
{
string[] tokenParts = sessionToken.Split(':');
return SessionTokenHelper.Parse(tokenParts[1]);
}

private static bool PopulateRequestOptions(RequestOptions requestOptions, TestDoc doc, bool isSchematized, bool useEpk, int? ttlInSeconds)
{
Dictionary<string, object> properties = new Dictionary<string, object>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,52 @@ public async Task ItemBatchNoResponseTest()
this.ValidateResponse(response, noResponseItemCount);
}

[TestMethod]
public async Task TransactionalBatchNoResponseTest()
{
TransactionalBatchRequestOption requestOptions = new TransactionalBatchRequestOption()
{
EnableContentResponseOnOperations = false
};

string pkId = "TestTransactionalBatchId";
TransactionalBatch batch = this.container.CreateTransactionalBatch(new PartitionKey(pkId));

int noResponseItemCount = 100;
for (int i = 0; i < noResponseItemCount; i++)
{
ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkId);
batch.CreateItem<ToDoActivity>(item);
}

TransactionalBatchResponse response = await batch.ExecuteAsync(requestOptions);
Assert.AreEqual(100, response.Count);
this.ValidateResponse(response, noResponseItemCount);

pkId = "TestTransactionalBatchId2";
batch = this.container.CreateTransactionalBatch(new PartitionKey(pkId));

noResponseItemCount = 0;
for (int i = 0; i < 10; i++)
{
ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(pk: pkId);
batch.CreateItem<ToDoActivity>(item);
noResponseItemCount++;
ToDoActivity item2 = ToDoActivity.CreateRandomToDoActivity(pk: pkId);
item2.id = item.id;
batch.ReplaceItem<ToDoActivity>(item2.id, item2);
noResponseItemCount++;

// Even Read won't return response
batch.ReadItem(item2.id);
noResponseItemCount++;
}

response = await batch.ExecuteAsync(requestOptions);
Assert.AreEqual(noResponseItemCount, response.Count);
this.ValidateResponse(response, noResponseItemCount);
}

[TestMethod]
public async Task ItemBulkNoResponseTest()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class CosmosDiagnosticsTests : BaseCosmosClientHelper
private Container Container = null;
private ContainerProperties containerSettings = null;

private static readonly ItemRequestOptions RequestOptionDisableDiagnostic = new ItemRequestOptions()
private static readonly TransactionalBatchRequestOption RequestOptionDisableDiagnostic = new TransactionalBatchRequestOption()
{
DiagnosticContextFactory = () => EmptyCosmosDiagnosticsContext.Singleton
};
Expand Down Expand Up @@ -398,7 +398,7 @@ public async Task BatchOperationDiagnostic(bool disableDiagnostics)
batch.ReadItem(createItems[i].id);
}

ItemRequestOptions requestOptions = disableDiagnostics ? RequestOptionDisableDiagnostic : null;
TransactionalBatchRequestOption requestOptions = disableDiagnostics ? RequestOptionDisableDiagnostic : null;
TransactionalBatchResponse response = await batch.ExecuteAsync(requestOptions);

Assert.IsNotNull(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ public class BatchUnitTests
public async Task BatchInvalidOptionsAsync()
{
Container container = BatchUnitTests.GetContainer();
List<ItemRequestOptions> badBatchOptionsList = new List<ItemRequestOptions>()
List<TransactionalBatchRequestOption> badBatchOptionsList = new List<TransactionalBatchRequestOption>()
{
new ItemRequestOptions()
new TransactionalBatchRequestOption()
{
IfMatchEtag = "cond",
},
new ItemRequestOptions()
new TransactionalBatchRequestOption()
{
IfNoneMatchEtag = "cond2",
}
};

foreach (ItemRequestOptions batchOptions in badBatchOptionsList)
foreach (TransactionalBatchRequestOption batchOptions in badBatchOptionsList)
{
BatchCore batch = (BatchCore)
new BatchCore((ContainerInternal)container, new Cosmos.PartitionKey(BatchUnitTests.PartitionKey1))
Expand Down Expand Up @@ -476,8 +476,8 @@ private static void VerifyBatchItemRequestOptionsAreEqual(TransactionalBatchItem
private static async Task VerifyExceptionThrownOnExecuteAsync(
TransactionalBatch batch,
Type expectedTypeOfException,
string expectedExceptionMessage = null,
ItemRequestOptions requestOptions = null)
string expectedExceptionMessage = null,
TransactionalBatchRequestOption requestOptions = null)
{
bool wasExceptionThrown = false;
try
Expand Down
Loading