Skip to content

Commit

Permalink
[Internal] CTL: Adds query scenario (#2701)
Browse files Browse the repository at this point in the history
* polishing cf pull

* Adding check to CFP

* Refactor helper classes

* Base query

* metrics

* Add continuations to the error

* Adding 3 queries

* Changing as gauge

* remove async/await

* comments

* adding full drain
  • Loading branch information
ealsur committed Sep 14, 2021
1 parent 9561c55 commit f5e1ae6
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 103 deletions.
1 change: 1 addition & 0 deletions Microsoft.Azure.Cosmos.Samples/Tools/CTL/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ private static ICTLScenario CreateScenario(WorkloadType workloadType)
WorkloadType.ReadWriteQuery => new ReadWriteQueryScenario(),
WorkloadType.ChangeFeedProcessor => new ChangeFeedProcessorScenario(),
WorkloadType.ChangeFeedPull => new ChangeFeedPullScenario(),
WorkloadType.Query => new QueryScenario(),
_ => throw new NotImplementedException($"No mapping for {workloadType}"),
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ namespace CosmosCTL

internal class ChangeFeedProcessorScenario : ICTLScenario
{
private InitializationResult initializationResult;
private Utils.InitializationResult initializationResult;

public async Task InitializeAsync(
CTLConfig config,
CosmosClient cosmosClient,
ILogger logger)
{
this.initializationResult = await CreateDatabaseAndContainerAsync(config, cosmosClient);
this.initializationResult = await Utils.CreateDatabaseAndContainerAsync(config, cosmosClient);
if (this.initializationResult.CreatedDatabase)
{
logger.LogInformation("Created database for execution");
Expand All @@ -39,7 +39,8 @@ public async Task InitializeAsync(
if (config.PreCreatedDocuments > 0)
{
logger.LogInformation("Pre-populating {0} documents", config.PreCreatedDocuments);
await Utils.PopulateDocumentsAsync(config, logger, new List<Container>() { cosmosClient.GetContainer(config.Database, config.Collection) });
IReadOnlyDictionary<string, IReadOnlyList<Dictionary<string, string>>> insertedDocuments = await Utils.PopulateDocumentsAsync(config, logger, new List<Container>() { cosmosClient.GetContainer(config.Database, config.Collection) });
this.initializationResult.InsertedDocuments = insertedDocuments[config.Collection].Count;
}
}

Expand All @@ -61,10 +62,17 @@ public async Task RunAsync(

try
{
object lockObject = new object();
long documentTotal = 0;
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(config.Database, config.Collection)
.GetChangeFeedProcessorBuilder<SimpleItem>("ctlProcessor",
(IReadOnlyCollection<SimpleItem> docs, CancellationToken token) =>
{
lock (lockObject)
{
documentTotal += docs.Count;
}
metrics.Measure.Counter.Increment(documentCounter, docs.Count);
return Task.CompletedTask;
})
Expand Down Expand Up @@ -112,6 +120,14 @@ public async Task RunAsync(

previousMin = sortedRange.Max;
}

if (config.PreCreatedDocuments > 0)
{
if (this.initializationResult.InsertedDocuments != documentTotal)
{
logger.LogError($"Expected to receive {this.initializationResult.InsertedDocuments} documents and got {documentTotal}");
}
}
}
catch (Exception ex)
{
Expand All @@ -132,48 +148,6 @@ public async Task RunAsync(
}
}

private static async Task<InitializationResult> CreateDatabaseAndContainerAsync(
CTLConfig config,
CosmosClient cosmosClient)
{
InitializationResult result = new InitializationResult()
{
CreatedDatabase = false,
CreatedContainer = false
};

Database database;
try
{
database = await cosmosClient.GetDatabase(config.Database).ReadAsync();
}
catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound)
{
DatabaseResponse databaseResponse = await cosmosClient.CreateDatabaseAsync(config.Database, config.Throughput);
result.CreatedDatabase = true;
database = databaseResponse.Database;
}

Container container;
try
{
container = await database.GetContainer(config.Collection).ReadContainerAsync();
}
catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound)
{
await database.CreateContainerAsync(config.Collection, $"/{config.CollectionPartitionKey}");
result.CreatedContainer = true;
}

return result;
}

private struct InitializationResult
{
public bool CreatedDatabase;
public bool CreatedContainer;
}

private class SimpleItem
{
[JsonProperty("id")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ namespace CosmosCTL
using App.Metrics;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging;
using App.Metrics.Gauge;

internal class ChangeFeedPullScenario : ICTLScenario
{
private InitializationResult initializationResult;
private Utils.InitializationResult initializationResult;

public async Task InitializeAsync(
CTLConfig config,
CosmosClient cosmosClient,
ILogger logger)
{
this.initializationResult = await CreateDatabaseAndContainerAsync(config, cosmosClient);
this.initializationResult = await Utils.CreateDatabaseAndContainerAsync(config, cosmosClient);

if (this.initializationResult.CreatedDatabase)
{
Expand All @@ -37,7 +39,8 @@ public async Task InitializeAsync(
if (config.PreCreatedDocuments > 0)
{
logger.LogInformation("Pre-populating {0} documents", config.PreCreatedDocuments);
await Utils.PopulateDocumentsAsync(config, logger, new List<Container>() { cosmosClient.GetContainer(config.Database, config.Collection) });
IReadOnlyDictionary<string, IReadOnlyList<Dictionary<string, string>>> insertedDocuments = await Utils.PopulateDocumentsAsync(config, logger, new List<Container>() { cosmosClient.GetContainer(config.Database, config.Collection) });
this.initializationResult.InsertedDocuments = insertedDocuments[config.Collection].Count;
}
}

Expand All @@ -51,13 +54,16 @@ public async Task RunAsync(
{
Stopwatch stopWatch = Stopwatch.StartNew();

GaugeOptions documentGauge= new GaugeOptions { Name = "#Documents received", Context = loggingContextIdentifier };
Container container = cosmosClient.GetContainer(config.Database, config.Collection);

while (stopWatch.Elapsed <= config.RunningTimeDurationAsTimespan)
{
int documentTotal = 0;
long documentTotal = 0;
string continuation = null;
Container container = cosmosClient.GetContainer(config.Database, config.Collection);
FeedIterator<Dictionary<string, string>> changeFeedPull = container.GetChangeFeedIterator<Dictionary<string, string>>
(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.Incremental);
using FeedIterator<Dictionary<string, string>> changeFeedPull
= container.GetChangeFeedIterator<Dictionary<string, string>>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.Incremental);

try
{
while (changeFeedPull.HasMoreResults)
Expand All @@ -71,65 +77,28 @@ public async Task RunAsync(
}
}

if (config.PreCreatedDocuments == documentTotal)
{
logger.LogInformation($"Success: The number of new documents match the number of pre-created documents: {config.PreCreatedDocuments}");
}
else
metrics.Measure.Gauge.SetValue(documentGauge, documentTotal);

if (config.PreCreatedDocuments > 0)
{
logger.LogError($"The prepopulated documents and the new documents don't match. Preconfigured Docs = {config.PreCreatedDocuments}, New Documents = {documentTotal}");
logger.LogError(continuation);
if (this.initializationResult.InsertedDocuments == documentTotal)
{
logger.LogInformation($"Success: The number of new documents match the number of pre-created documents: {this.initializationResult.InsertedDocuments}");
}
else
{
logger.LogError($"The prepopulated documents and the change feed documents don't match. Preconfigured Docs = {this.initializationResult.InsertedDocuments}, Change feed Documents = {documentTotal}.{Environment.NewLine}{continuation}");
}
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failure while looping through new documents");
metrics.Measure.Gauge.SetValue(documentGauge, documentTotal);
logger.LogError(ex, "Failure while looping through change feed documents");
}
}

stopWatch.Stop();
}
private static async Task<InitializationResult> CreateDatabaseAndContainerAsync(
CTLConfig config,
CosmosClient cosmosClient)
{
InitializationResult result = new InitializationResult()
{
CreatedDatabase = false,
CreatedContainer = false
};

Database database;

try
{
database = await cosmosClient.GetDatabase(config.Database).ReadAsync();
}
catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound)
{
DatabaseResponse databaseResponse = await cosmosClient.CreateDatabaseAsync(config.Database, config.Throughput);
result.CreatedDatabase = true;
database = databaseResponse.Database;
}

Container container;

try
{
container = await database.GetContainer(config.Collection).ReadContainerAsync();
}
catch (CosmosException exception) when (exception.StatusCode == System.Net.HttpStatusCode.NotFound)
{
await database.CreateContainerAsync(config.Collection, $"/{config.CollectionPartitionKey}");
result.CreatedContainer = true;
}

return result;
}
private struct InitializationResult
{
public bool CreatedDatabase;
public bool CreatedContainer;
}
}
}
Loading

0 comments on commit f5e1ae6

Please sign in to comment.