diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Program.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Program.cs index ef30d0dd8b..1ac4294acb 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Program.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Program.cs @@ -189,6 +189,7 @@ private static ICTLScenario CreateScenario(WorkloadType workloadType) WorkloadType.ReadWriteQuery => new ReadWriteQueryScenario(), WorkloadType.ChangeFeedProcessor => new ChangeFeedProcessorScenario(), WorkloadType.ChangeFeedPull => new ChangeFeedPullScenario(), + WorkloadType.Query => new QueryScenario(), _ => throw new NotImplementedException($"No mapping for {workloadType}"), }; } diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedProcessorScenario.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedProcessorScenario.cs index 51b3adbe72..a5f6a2a3ce 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedProcessorScenario.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedProcessorScenario.cs @@ -18,14 +18,14 @@ namespace CosmosCTL internal class ChangeFeedProcessorScenario : ICTLScenario { - private InitializationResult initializationResult; + private Utils.InitializationResult initializationResult; public async Task InitializeAsync( CTLConfig config, CosmosClient cosmosClient, ILogger logger) { - this.initializationResult = await CreateDatabaseAndContainerAsync(config, cosmosClient); + this.initializationResult = await Utils.CreateDatabaseAndContainerAsync(config, cosmosClient); if (this.initializationResult.CreatedDatabase) { logger.LogInformation("Created database for execution"); @@ -39,7 +39,8 @@ public async Task InitializeAsync( if (config.PreCreatedDocuments > 0) { logger.LogInformation("Pre-populating {0} documents", config.PreCreatedDocuments); - await Utils.PopulateDocumentsAsync(config, logger, new List() { cosmosClient.GetContainer(config.Database, config.Collection) }); + IReadOnlyDictionary>> insertedDocuments = await Utils.PopulateDocumentsAsync(config, logger, new List() { cosmosClient.GetContainer(config.Database, config.Collection) }); + this.initializationResult.InsertedDocuments = insertedDocuments[config.Collection].Count; } } @@ -61,10 +62,17 @@ public async Task RunAsync( try { + object lockObject = new object(); + long documentTotal = 0; ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(config.Database, config.Collection) .GetChangeFeedProcessorBuilder("ctlProcessor", (IReadOnlyCollection docs, CancellationToken token) => { + lock (lockObject) + { + documentTotal += docs.Count; + } + metrics.Measure.Counter.Increment(documentCounter, docs.Count); return Task.CompletedTask; }) @@ -112,6 +120,14 @@ public async Task RunAsync( previousMin = sortedRange.Max; } + + if (config.PreCreatedDocuments > 0) + { + if (this.initializationResult.InsertedDocuments != documentTotal) + { + logger.LogError($"Expected to receive {this.initializationResult.InsertedDocuments} documents and got {documentTotal}"); + } + } } catch (Exception ex) { @@ -132,48 +148,6 @@ public async Task RunAsync( } } - private static async Task CreateDatabaseAndContainerAsync( - CTLConfig config, - CosmosClient cosmosClient) - { - InitializationResult result = new InitializationResult() - { - CreatedDatabase = false, - CreatedContainer = false - }; - - Database database; - try - { - database = await cosmosClient.GetDatabase(config.Database).ReadAsync(); - } - catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound) - { - DatabaseResponse databaseResponse = await cosmosClient.CreateDatabaseAsync(config.Database, config.Throughput); - result.CreatedDatabase = true; - database = databaseResponse.Database; - } - - Container container; - try - { - container = await database.GetContainer(config.Collection).ReadContainerAsync(); - } - catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound) - { - await database.CreateContainerAsync(config.Collection, $"/{config.CollectionPartitionKey}"); - result.CreatedContainer = true; - } - - return result; - } - - private struct InitializationResult - { - public bool CreatedDatabase; - public bool CreatedContainer; - } - private class SimpleItem { [JsonProperty("id")] diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs index ca4063da66..5049c2b3fb 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/ChangeFeedPullScenario.cs @@ -13,16 +13,18 @@ namespace CosmosCTL using App.Metrics; using Microsoft.Azure.Cosmos; using Microsoft.Extensions.Logging; + using App.Metrics.Gauge; internal class ChangeFeedPullScenario : ICTLScenario { - private InitializationResult initializationResult; + private Utils.InitializationResult initializationResult; + public async Task InitializeAsync( CTLConfig config, CosmosClient cosmosClient, ILogger logger) { - this.initializationResult = await CreateDatabaseAndContainerAsync(config, cosmosClient); + this.initializationResult = await Utils.CreateDatabaseAndContainerAsync(config, cosmosClient); if (this.initializationResult.CreatedDatabase) { @@ -37,7 +39,8 @@ public async Task InitializeAsync( if (config.PreCreatedDocuments > 0) { logger.LogInformation("Pre-populating {0} documents", config.PreCreatedDocuments); - await Utils.PopulateDocumentsAsync(config, logger, new List() { cosmosClient.GetContainer(config.Database, config.Collection) }); + IReadOnlyDictionary>> insertedDocuments = await Utils.PopulateDocumentsAsync(config, logger, new List() { cosmosClient.GetContainer(config.Database, config.Collection) }); + this.initializationResult.InsertedDocuments = insertedDocuments[config.Collection].Count; } } @@ -51,13 +54,16 @@ public async Task RunAsync( { Stopwatch stopWatch = Stopwatch.StartNew(); + GaugeOptions documentGauge= new GaugeOptions { Name = "#Documents received", Context = loggingContextIdentifier }; + Container container = cosmosClient.GetContainer(config.Database, config.Collection); + while (stopWatch.Elapsed <= config.RunningTimeDurationAsTimespan) { - int documentTotal = 0; + long documentTotal = 0; string continuation = null; - Container container = cosmosClient.GetContainer(config.Database, config.Collection); - FeedIterator> changeFeedPull = container.GetChangeFeedIterator> - (ChangeFeedStartFrom.Beginning(), ChangeFeedMode.Incremental); + using FeedIterator> changeFeedPull + = container.GetChangeFeedIterator>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.Incremental); + try { while (changeFeedPull.HasMoreResults) @@ -71,65 +77,28 @@ public async Task RunAsync( } } - if (config.PreCreatedDocuments == documentTotal) - { - logger.LogInformation($"Success: The number of new documents match the number of pre-created documents: {config.PreCreatedDocuments}"); - } - else + metrics.Measure.Gauge.SetValue(documentGauge, documentTotal); + + if (config.PreCreatedDocuments > 0) { - logger.LogError($"The prepopulated documents and the new documents don't match. Preconfigured Docs = {config.PreCreatedDocuments}, New Documents = {documentTotal}"); - logger.LogError(continuation); + if (this.initializationResult.InsertedDocuments == documentTotal) + { + logger.LogInformation($"Success: The number of new documents match the number of pre-created documents: {this.initializationResult.InsertedDocuments}"); + } + else + { + logger.LogError($"The prepopulated documents and the change feed documents don't match. Preconfigured Docs = {this.initializationResult.InsertedDocuments}, Change feed Documents = {documentTotal}.{Environment.NewLine}{continuation}"); + } } } catch (Exception ex) { - logger.LogError(ex, "Failure while looping through new documents"); + metrics.Measure.Gauge.SetValue(documentGauge, documentTotal); + logger.LogError(ex, "Failure while looping through change feed documents"); } } stopWatch.Stop(); } - private static async Task CreateDatabaseAndContainerAsync( - CTLConfig config, - CosmosClient cosmosClient) - { - InitializationResult result = new InitializationResult() - { - CreatedDatabase = false, - CreatedContainer = false - }; - - Database database; - - try - { - database = await cosmosClient.GetDatabase(config.Database).ReadAsync(); - } - catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound) - { - DatabaseResponse databaseResponse = await cosmosClient.CreateDatabaseAsync(config.Database, config.Throughput); - result.CreatedDatabase = true; - database = databaseResponse.Database; - } - - Container container; - - try - { - container = await database.GetContainer(config.Collection).ReadContainerAsync(); - } - catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound) - { - await database.CreateContainerAsync(config.Collection, $"/{config.CollectionPartitionKey}"); - result.CreatedContainer = true; - } - - return result; - } - private struct InitializationResult - { - public bool CreatedDatabase; - public bool CreatedContainer; - } } } diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/QueryScenario.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/QueryScenario.cs new file mode 100644 index 0000000000..a0f94c96ec --- /dev/null +++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Scenarios/QueryScenario.cs @@ -0,0 +1,265 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace CosmosCTL +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + using System.Diagnostics; + using App.Metrics; + using Microsoft.Azure.Cosmos; + using Microsoft.Extensions.Logging; + using System.Text; + using App.Metrics.Gauge; + + internal class QueryScenario : ICTLScenario + { + private Utils.InitializationResult initializationResult; + + public async Task InitializeAsync( + CTLConfig config, + CosmosClient cosmosClient, + ILogger logger) + { + this.initializationResult = await Utils.CreateDatabaseAndContainerAsync(config, cosmosClient); + + if (this.initializationResult.CreatedDatabase) + { + logger.LogInformation("Created database for execution"); + } + + if (this.initializationResult.CreatedContainer) + { + logger.LogInformation("Created collection for execution"); + } + + if (config.PreCreatedDocuments > 0) + { + logger.LogInformation("Pre-populating {0} documents", config.PreCreatedDocuments); + IReadOnlyDictionary>> insertedDocuments = await Utils.PopulateDocumentsAsync(config, logger, new List() { cosmosClient.GetContainer(config.Database, config.Collection) }); + this.initializationResult.InsertedDocuments = insertedDocuments[config.Collection].Count; + } + } + + public Task RunAsync( + CTLConfig config, + CosmosClient cosmosClient, + ILogger logger, + IMetrics metrics, + string loggingContextIdentifier, + CancellationToken cancellationToken) + { + return Task.WhenAll( + QueryScenario.ExecuteQueryAndGatherResultsAsync( + config, + cosmosClient, + logger, + metrics, + loggingContextIdentifier, + cancellationToken, + queryText: "select * from c", + queryName: "Star", + expectedResults: config.PreCreatedDocuments > 0 ? this.initializationResult.InsertedDocuments: 0), + QueryScenario.ExecuteQueryAndGatherResultsAsync( + config, + cosmosClient, + logger, + metrics, + loggingContextIdentifier, + cancellationToken, + queryText: "select * from c order by c.id", + queryName: "OrderBy", + expectedResults: config.PreCreatedDocuments > 0 ? this.initializationResult.InsertedDocuments : 0), + QueryScenario.ExecuteQueryAndGatherResultsAsync( + config, + cosmosClient, + logger, + metrics, + loggingContextIdentifier, + cancellationToken, + queryText: "select count(1) from c", + queryName: "Aggregates", + expectedResults: 1), + QueryScenario.ExecuteQueryWithContinuationAndGatherResultsAsync( + config, + cosmosClient, + logger, + metrics, + loggingContextIdentifier, + cancellationToken, + queryText: "select * from c", + queryName: "Star", + expectedResults: config.PreCreatedDocuments > 0 ? this.initializationResult.InsertedDocuments : 0), + QueryScenario.ExecuteQueryWithContinuationAndGatherResultsAsync( + config, + cosmosClient, + logger, + metrics, + loggingContextIdentifier, + cancellationToken, + queryText: "select * from c order by c.id", + queryName: "OrderBy", + expectedResults: config.PreCreatedDocuments > 0 ? this.initializationResult.InsertedDocuments : 0), + QueryScenario.ExecuteQueryWithContinuationAndGatherResultsAsync( + config, + cosmosClient, + logger, + metrics, + loggingContextIdentifier, + cancellationToken, + queryText: "select count(1) from c", + queryName: "Aggregates", + expectedResults: 1) + ); + } + + /// + /// Executes the query and does paging using continuations. + /// + private async static Task ExecuteQueryWithContinuationAndGatherResultsAsync(CTLConfig config, + CosmosClient cosmosClient, + ILogger logger, + IMetrics metrics, + string loggingContextIdentifier, + CancellationToken cancellationToken, + string queryText, + string queryName, + long expectedResults) + { + Stopwatch stopWatch = Stopwatch.StartNew(); + + GaugeOptions documentGauge = new GaugeOptions + { + Name = $"#{queryName} Query with CT received documents", + MeasurementUnit = Unit.Items, + Context = loggingContextIdentifier + }; + + Container container = cosmosClient.GetContainer(config.Database, config.Collection); + while (stopWatch.Elapsed <= config.RunningTimeDurationAsTimespan + && !cancellationToken.IsCancellationRequested) + { + // To really debug what happened on the query, having a list of all continuations would be useful. + List allContinuations = new List(); + long documentTotal = 0; + string continuation; + FeedIterator> query = container.GetItemQueryIterator>(queryText); + try + { + while (query.HasMoreResults) + { + FeedResponse> response = await query.ReadNextAsync(); + documentTotal += response.Count; + continuation = response.ContinuationToken; + allContinuations.Add(continuation); + if (continuation != null) + { + // Use continuation to paginate on the query instead of draining just the initial query + // This validates that we can indeed move forward with the continuation + query.Dispose(); + query = container.GetItemQueryIterator>(queryText, continuation); + } + } + + query.Dispose(); + + metrics.Measure.Gauge.SetValue(documentGauge, documentTotal); + + if (expectedResults > 0 && expectedResults != documentTotal) + { + StringBuilder errorDetail = new StringBuilder(); + errorDetail.AppendLine($"{queryName} Query with CT expected to read {expectedResults} but got {documentTotal}"); + foreach (string c in allContinuations) + { + errorDetail.AppendLine($"Continuation: {c}"); + } + + logger.LogError(errorDetail.ToString()); + } + } + catch (Exception ex) + { + metrics.Measure.Gauge.SetValue(documentGauge, documentTotal); + + StringBuilder errorDetail = new StringBuilder(); + errorDetail.AppendLine($"{queryName} Query with CT failure while looping through query."); + foreach (string c in allContinuations) + { + errorDetail.AppendLine($"Continuation: {c}"); + } + + logger.LogError(ex, errorDetail.ToString()); + } + } + + stopWatch.Stop(); + } + + /// + /// Executes the query and drains all results + /// + private async static Task ExecuteQueryAndGatherResultsAsync(CTLConfig config, + CosmosClient cosmosClient, + ILogger logger, + IMetrics metrics, + string loggingContextIdentifier, + CancellationToken cancellationToken, + string queryText, + string queryName, + long expectedResults) + { + Stopwatch stopWatch = Stopwatch.StartNew(); + + GaugeOptions documentGauge = new GaugeOptions + { + Name = $"#{queryName} Query received documents", + MeasurementUnit = Unit.Items, + Context = loggingContextIdentifier + }; + + Container container = cosmosClient.GetContainer(config.Database, config.Collection); + while (stopWatch.Elapsed <= config.RunningTimeDurationAsTimespan + && !cancellationToken.IsCancellationRequested) + { + long documentTotal = 0; + string continuation = null; + using FeedIterator> query = container.GetItemQueryIterator>(queryText); + try + { + while (query.HasMoreResults) + { + FeedResponse> response = await query.ReadNextAsync(); + documentTotal += response.Count; + continuation = response.ContinuationToken; + } + + metrics.Measure.Gauge.SetValue(documentGauge, documentTotal); + + if (expectedResults > 0 && expectedResults != documentTotal) + { + StringBuilder errorDetail = new StringBuilder(); + errorDetail.AppendLine($"{queryName} Query expected to read {expectedResults} but got {documentTotal}"); + errorDetail.AppendLine($"Last continuation: {continuation}"); + + logger.LogError(errorDetail.ToString()); + } + } + catch (Exception ex) + { + metrics.Measure.Gauge.SetValue(documentGauge, documentTotal); + + StringBuilder errorDetail = new StringBuilder(); + errorDetail.AppendLine($"{queryName} Query failure while looping through query."); + errorDetail.AppendLine($"Last continuation: {continuation}"); + + logger.LogError(ex, errorDetail.ToString()); + } + } + + stopWatch.Stop(); + } + } +} diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Utils.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Utils.cs index 40cf2e3914..ac406a49cd 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Utils.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/Utils.cs @@ -121,5 +121,48 @@ public static Dictionary GenerateDocument(string partitionKeyPro return document; } + + public static async Task CreateDatabaseAndContainerAsync( + CTLConfig config, + CosmosClient cosmosClient) + { + InitializationResult result = new InitializationResult() + { + CreatedDatabase = false, + CreatedContainer = false + }; + + Database database; + try + { + database = await cosmosClient.GetDatabase(config.Database).ReadAsync(); + } + catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound) + { + DatabaseResponse databaseResponse = await cosmosClient.CreateDatabaseAsync(config.Database, config.Throughput); + result.CreatedDatabase = true; + database = databaseResponse.Database; + } + + Container container; + try + { + container = await database.GetContainer(config.Collection).ReadContainerAsync(); + } + catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound) + { + await database.CreateContainerAsync(config.Collection, $"/{config.CollectionPartitionKey}"); + result.CreatedContainer = true; + } + + return result; + } + + public struct InitializationResult + { + public bool CreatedDatabase; + public bool CreatedContainer; + public long InsertedDocuments; + } } } diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/WorkloadType.cs b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/WorkloadType.cs index 5289ed2ba2..64e3d36fe6 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/CTL/WorkloadType.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/CTL/WorkloadType.cs @@ -8,6 +8,7 @@ public enum WorkloadType { ReadWriteQuery, ChangeFeedProcessor, - ChangeFeedPull + ChangeFeedPull, + Query } }