Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch API: Add support of request options for transactional batch #1569

Merged
merged 10 commits into from
Jul 14, 2020
6 changes: 3 additions & 3 deletions Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
/// <summary>
/// Executes the batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="requestOptions">Options that apply to the batch. Used only for EPK routing.</param>
/// <param name="requestOptions">Options that apply to the batch.</param>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>An awaitable <see cref="TransactionalBatchResponse"/> which contains the completion status and results of each operation.</returns>
public virtual Task<TransactionalBatchResponse> ExecuteAsync(
RequestOptions requestOptions,
public override Task<TransactionalBatchResponse> ExecuteAsync(
TransactionalBatchRequestOptions requestOptions,
CancellationToken cancellationToken = default(CancellationToken))
{
return this.container.ClientContext.OperationHelperAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ internal PartitionKeyRangeBatchResponse(
}

itemBatchOperations.AddRange(serverResponse.Operations);
this.RequestCharge += serverResponse.RequestCharge;
this.Headers = serverResponse.Headers;

if (!string.IsNullOrEmpty(serverResponse.ErrorMessage))
{
Expand Down
32 changes: 32 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,5 +223,37 @@ public abstract TransactionalBatch DeleteItem(
/// </remarks>
public abstract Task<TransactionalBatchResponse> ExecuteAsync(
CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Executes the transactional batch at the Azure Cosmos service as an asynchronous operation.
/// </summary>
/// <param name="requestOptions">Options that apply specifically to batch request.</param>
/// <param name="cancellationToken">(Optional) Cancellation token representing request cancellation.</param>
/// <returns>An awaitable response which contains details of execution of the transactional batch.
/// <para>
/// If the transactional batch executes successfully, the <see cref="TransactionalBatchResponse.StatusCode"/> on the response returned
/// will be set to <see cref="HttpStatusCode.OK"/>.
/// </para>
/// <para>
/// If an operation within the transactional batch fails during execution, no changes from the batch will be committed
/// and the status of the failing operation is made available in the <see cref="TransactionalBatchResponse.StatusCode"/>.
/// To get more details about the operation that failed, the response can be enumerated - this returns <see cref="TransactionalBatchOperationResult" />
/// instances corresponding to each operation in the transactional batch in the order they were added into the transactional batch.
/// For a result corresponding to an operation within the transactional batch, the <see cref="TransactionalBatchOperationResult.StatusCode"/> indicates
/// the status of the operation - if the operation was not executed or it was aborted due to the failure of another operation within the transactional batch,
/// the value of this field will be HTTP 424 (Failed Dependency); for the operation that caused the batch to abort, the value of this field will indicate
/// the cause of failure as a HTTP status code.
/// </para>
/// <para>
/// The <see cref="TransactionalBatchResponse.StatusCode"/> on the response returned may also have values such as HTTP 5xx in case of server errors and HTTP 429 (Too Many Requests).
/// </para>
/// </returns>
/// <remarks>
/// This API only throws on client side exceptions. This is to increase performance and prevent the overhead of throwing exceptions.
/// Use <see cref="TransactionalBatchResponse.IsSuccessStatusCode"/> on the response returned to ensure that the transactional batch succeeded.
/// </remarks>
public abstract Task<TransactionalBatchResponse> ExecuteAsync(
TransactionalBatchRequestOptions requestOptions,
CancellationToken cancellationToken = default(CancellationToken));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
/// <summary>
/// Cosmos batch request options.
/// </summary>
public class TransactionalBatchRequestOptions : RequestOptions
{
/// <summary>
/// Gets or sets the consistency level required for the request in the Azure Cosmos DB service.
/// </summary>
/// <value>
/// The consistency level required for the request.
/// </value>
/// <remarks>
/// Azure Cosmos DB offers 5 different consistency levels. Strong, Bounded Staleness, Session, Consistent Prefix and Eventual - in order of strongest to weakest consistency. <see cref="ConnectionPolicy"/>
/// <para>
/// While this is set at a database account level, Azure Cosmos DB allows a developer to override the default consistency level
/// for each individual request.
/// </para>
/// </remarks>
public ConsistencyLevel? ConsistencyLevel
{
get => this.BaseConsistencyLevel;
set => this.BaseConsistencyLevel = value;
}

/// <summary>
/// Fill the CosmosRequestMessage headers with the set properties
/// </summary>
/// <param name="request">The <see cref="RequestMessage"/></param>
internal override void PopulateRequestOptions(RequestMessage request)
{
base.PopulateRequestOptions(request);
}
}
}
39 changes: 15 additions & 24 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ internal TransactionalBatchResponse(
: this(statusCode,
subStatusCode,
errorMessage,
requestCharge: 0,
retryAfter: null,
activityId: Guid.Empty.ToString(),
new Headers(),
diagnosticsContext: diagnosticsContext,
operations: operations,
serializer: null)
Expand All @@ -65,9 +63,7 @@ private TransactionalBatchResponse(
HttpStatusCode statusCode,
SubStatusCodes subStatusCode,
string errorMessage,
double requestCharge,
TimeSpan? retryAfter,
string activityId,
Headers headers,
CosmosDiagnosticsContext diagnosticsContext,
IReadOnlyList<ItemBatchOperation> operations,
CosmosSerializerCore serializer)
Expand All @@ -77,30 +73,33 @@ private TransactionalBatchResponse(
this.ErrorMessage = errorMessage;
this.Operations = operations;
this.SerializerCore = serializer;
this.RequestCharge = requestCharge;
this.RetryAfter = retryAfter;
this.ActivityId = activityId;
this.Headers = headers;
this.Diagnostics = diagnosticsContext.Diagnostics;
this.DiagnosticsContext = diagnosticsContext ?? throw new ArgumentNullException(nameof(diagnosticsContext));
}

/// <summary>
/// Gets the current HTTP headers.
/// </summary>
public virtual Headers Headers { get; internal set; }

/// <summary>
/// Gets the ActivityId that identifies the server request made to execute the batch.
/// </summary>
public virtual string ActivityId { get; }
public virtual string ActivityId => this.Headers?.ActivityId;

/// <summary>
/// Gets the request charge for the batch request.
/// </summary>
/// <value>
/// The request charge measured in request units.
/// </value>
public virtual double RequestCharge { get; internal set; }
public virtual double RequestCharge => this.Headers?.RequestCharge ?? 0;

/// <summary>
/// Gets the amount of time to wait before retrying this or any other request within Cosmos container or collection due to throttling.
/// </summary>
public virtual TimeSpan? RetryAfter { get; }
public virtual TimeSpan? RetryAfter => this.Headers?.RetryAfter;

/// <summary>
/// Gets the completion status code of the batch request.
Expand Down Expand Up @@ -252,9 +251,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
HttpStatusCode.InternalServerError,
SubStatusCodes.Unknown,
ClientResources.ServerResponseDeserializationFailure,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand All @@ -268,9 +265,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
responseMessage.StatusCode,
responseMessage.Headers.SubStatusCode,
responseMessage.ErrorMessage,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand All @@ -286,9 +281,7 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
HttpStatusCode.InternalServerError,
SubStatusCodes.Unknown,
ClientResources.InvalidServerResponse,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand Down Expand Up @@ -382,9 +375,7 @@ record =>
responseStatusCode,
responseSubStatusCode,
responseMessage.ErrorMessage,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.RetryAfter,
responseMessage.Headers.ActivityId,
responseMessage.Headers,
responseMessage.DiagnosticsContext,
serverRequest.Operations,
serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,38 @@ public async Task BatchItemETagAsync()
}
}

[TestMethod]
[Owner("rakkuma")]
[Description("Verify session token received from batch operations")]
public async Task BatchItemSessionTokenAsync()
{
Container container = BatchTestBase.JsonContainer;
await this.CreateJsonTestDocsAsync(container);

TestDoc testDocToCreate = BatchTestBase.PopulateTestDoc(this.PartitionKey1);

TestDoc testDocToReplace = this.GetTestDocCopy(this.TestDocPk1ExistingA);
testDocToReplace.Cost++;

ItemResponse<TestDoc> readResponse = await BatchTestBase.JsonContainer.ReadItemAsync<TestDoc>(
this.TestDocPk1ExistingA.Id,
BatchTestBase.GetPartitionKey(this.PartitionKey1));

ISessionToken beforeRequestSessionToken = BatchTestBase.GetSessionToken(readResponse.Headers.Session);

TransactionalBatchResponse batchResponse = await new BatchCore((ContainerInlineCore)container, BatchTestBase.GetPartitionKey(this.PartitionKey1))
.CreateItem(testDocToCreate)
.ReplaceItem(testDocToReplace.Id, testDocToReplace)
.ExecuteAsync();

BatchSinglePartitionKeyTests.VerifyBatchProcessed(batchResponse, numberOfOperations: 2);
Assert.AreEqual(HttpStatusCode.Created, batchResponse[0].StatusCode);
Assert.AreEqual(HttpStatusCode.OK, batchResponse[1].StatusCode);

ISessionToken afterRequestSessionToken = BatchTestBase.GetSessionToken(batchResponse.Headers.Session);
Assert.IsTrue(afterRequestSessionToken.LSN > beforeRequestSessionToken.LSN, "Response session token should be more than request session token");
}

[TestMethod]
[Owner("abpai")]
[Description("Verify TTL passed to binary passthrough batch operations flow as expected")]
Expand Down Expand Up @@ -320,7 +352,7 @@ public async Task BatchReadsOnlyAsync()

private async Task<TransactionalBatchResponse> RunCrudAsync(bool isStream, bool isSchematized, bool useEpk, Container container)
{
RequestOptions batchOptions = null;
TransactionalBatchRequestOptions batchOptions = null;
if (isSchematized)
{
await this.CreateSchematizedTestDocsAsync(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ protected static async Task VerifyNotFoundAsync(Container container, TestDoc doc
Assert.AreEqual(HttpStatusCode.NotFound, response.StatusCode);
}

protected static RequestOptions GetUpdatedBatchRequestOptions(
RequestOptions batchOptions = null,
protected static TransactionalBatchRequestOptions GetUpdatedBatchRequestOptions(
TransactionalBatchRequestOptions batchOptions = null,
bool isSchematized = false,
bool useEpk = false,
object partitionKey = null)
Expand All @@ -302,7 +302,7 @@ protected static RequestOptions GetUpdatedBatchRequestOptions(
{
if (batchOptions == null)
{
batchOptions = new RequestOptions();
batchOptions = new TransactionalBatchRequestOptions();
}

Dictionary<string, object> properties = new Dictionary<string, object>()
Expand Down Expand Up @@ -412,6 +412,12 @@ protected static byte[] HexStringToBytes(string input)
return bytes;
}

internal static ISessionToken GetSessionToken(string sessionToken)
{
string[] tokenParts = sessionToken.Split(':');
return SessionTokenHelper.Parse(tokenParts[1]);
}

private static bool PopulateRequestOptions(RequestOptions requestOptions, TestDoc doc, bool isSchematized, bool useEpk, int? ttlInSeconds)
{
Dictionary<string, object> properties = new Dictionary<string, object>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class CosmosDiagnosticsTests : BaseCosmosClientHelper
private Container Container = null;
private ContainerProperties containerSettings = null;

private static readonly ItemRequestOptions RequestOptionDisableDiagnostic = new ItemRequestOptions()
private static readonly TransactionalBatchRequestOptions RequestOptionDisableDiagnostic = new TransactionalBatchRequestOptions()
{
DiagnosticContextFactory = () => EmptyCosmosDiagnosticsContext.Singleton
};
Expand Down Expand Up @@ -398,8 +398,8 @@ public async Task BatchOperationDiagnostic(bool disableDiagnostics)
batch.ReadItem(createItems[i].id);
}

RequestOptions requestOptions = disableDiagnostics ? RequestOptionDisableDiagnostic : null;
TransactionalBatchResponse response = await ((BatchCore)batch).ExecuteAsync(requestOptions);
TransactionalBatchRequestOptions requestOptions = disableDiagnostics ? RequestOptionDisableDiagnostic : null;
TransactionalBatchResponse response = await batch.ExecuteAsync(requestOptions);

Assert.IsNotNull(response);
CosmosDiagnosticsTests.VerifyPointDiagnostics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ public class BatchUnitTests
public async Task BatchInvalidOptionsAsync()
{
Container container = BatchUnitTests.GetContainer();
List<RequestOptions> badBatchOptionsList = new List<RequestOptions>()
List<TransactionalBatchRequestOptions> badBatchOptionsList = new List<TransactionalBatchRequestOptions>()
{
new RequestOptions()
new TransactionalBatchRequestOptions()
{
IfMatchEtag = "cond",
},
new RequestOptions()
new TransactionalBatchRequestOptions()
{
IfNoneMatchEtag = "cond2",
}
};

foreach (RequestOptions batchOptions in badBatchOptionsList)
foreach (TransactionalBatchRequestOptions batchOptions in badBatchOptionsList)
{
BatchCore batch = (BatchCore)
new BatchCore((ContainerInternal)container, new Cosmos.PartitionKey(BatchUnitTests.PartitionKey1))
Expand Down Expand Up @@ -476,15 +476,15 @@ private static void VerifyBatchItemRequestOptionsAreEqual(TransactionalBatchItem
private static async Task VerifyExceptionThrownOnExecuteAsync(
TransactionalBatch batch,
Type expectedTypeOfException,
string expectedExceptionMessage = null,
RequestOptions requestOptions = null)
string expectedExceptionMessage = null,
TransactionalBatchRequestOptions requestOptions = null)
{
bool wasExceptionThrown = false;
try
{
if (requestOptions != null)
{
await ((BatchCore)batch).ExecuteAsync(requestOptions);
await batch.ExecuteAsync(requestOptions);
}
else
{
Expand Down
Loading